[jboss-cvs] JBoss Messaging SVN: r5751 - in trunk: src/main/org/jboss/messaging/core/config and 16 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 28 09:18:20 EST 2009
Author: timfox
Date: 2009-01-28 09:18:19 -0500 (Wed, 28 Jan 2009)
New Revision: 5751
Added:
trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/jboss/messaging/core/server/Bindable.java
trunk/src/main/org/jboss/messaging/core/server/Consumer.java
trunk/src/main/org/jboss/messaging/core/server/Divert.java
trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
More clustering
Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,27 +48,27 @@
// Constants -----------------------------------------------------
- public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("JBM_JMX_ObjectName");
+ public static final SimpleString HDR_JMX_OBJECTNAME = new SimpleString("_JBM_JMX_ObjectName");
- public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("JBM_JMXAttribute.");
+ public static final SimpleString HDR_JMX_ATTRIBUTE_PREFIX = new SimpleString("_JBM_JMXAttribute.");
- public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("JBM_JMXOperation.");
+ public static final SimpleString HDR_JMX_OPERATION_PREFIX = new SimpleString("_JBM_JMXOperation.");
public static final SimpleString HDR_JMX_OPERATION_NAME = new SimpleString(HDR_JMX_OPERATION_PREFIX + "name");
- public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("JBM_JMXOperationSucceeded");
+ public static final SimpleString HDR_JMX_OPERATION_SUCCEEDED = new SimpleString("_JBM_JMXOperationSucceeded");
- public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("JBM_JMXOperationException");
+ public static final SimpleString HDR_JMX_OPERATION_EXCEPTION = new SimpleString("_JBM_JMXOperationException");
- public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("JBM_NotifType");
+ public static final SimpleString HDR_NOTIFICATION_TYPE = new SimpleString("_JBM_NotifType");
- public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("JBM_NotifTimestamp");
+ public static final SimpleString HDR_NOTIFICATION_TIMESTAMP = new SimpleString("_JBM_NotifTimestamp");
- public static final SimpleString HDR_QUEUE_NAME = new SimpleString("JBM_QueueName");
+ public static final SimpleString HDR_QUEUE_NAME = new SimpleString("_JBM_QueueName");
- public static final SimpleString HDR_ADDRESS = new SimpleString("JBM_Address");
+ public static final SimpleString HDR_ADDRESS = new SimpleString("_JBM_Address");
- public static final SimpleString HDR_FILTERSTRING = new SimpleString("JBM_FilterString");
+ public static final SimpleString HDR_FILTERSTRING = new SimpleString("_JBM_FilterString");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -29,7 +29,7 @@
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
import org.jboss.messaging.core.config.cluster.QueueConfiguration;
@@ -119,9 +119,9 @@
void setDivertConfigurations(final List<DivertConfiguration> configs);
- List<ClusterConfiguration> getClusterConfigurations();
+ List<ClusterConnectionConfiguration> getClusterConfigurations();
- void setClusterConfigurations(final List<ClusterConfiguration> configs);
+ void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs);
List<QueueConfiguration> getQueueConfigurations();
Deleted: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,113 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.config.cluster;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.jboss.messaging.util.Pair;
-
-/**
- * A ClusterConfiguration
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 13 Jan 2009 09:42:17
- *
- *
- */
-public class ClusterConfiguration implements Serializable
-{
- private static final long serialVersionUID = 8948303813427795935L;
-
- private final String name;
-
- private final String address;
-
- private final BridgeConfiguration bridgeConfig;
-
- private final boolean duplicateDetection;
-
- private final List<Pair<String, String>> staticConnectorNamePairs;
-
- private final String discoveryGroupName;
-
- public ClusterConfiguration(final String name,
- final String address,
- final BridgeConfiguration bridgeConfig,
- final boolean duplicateDetection,
- final List<Pair<String, String>> staticConnectorNamePairs)
- {
- this.name = name;
- this.address = address;
- this.bridgeConfig = bridgeConfig;
- this.staticConnectorNamePairs = staticConnectorNamePairs;
- this.duplicateDetection = duplicateDetection;
- this.discoveryGroupName = null;
- }
-
- public ClusterConfiguration(final String name,
- final String address,
- final BridgeConfiguration bridgeConfig,
- final boolean duplicateDetection,
- final String discoveryGroupName)
- {
- this.name = name;
- this.address = address;
- this.bridgeConfig = bridgeConfig;
- this.duplicateDetection = duplicateDetection;
- this.discoveryGroupName = discoveryGroupName;
- this.staticConnectorNamePairs = null;
- }
-
- public String getName()
- {
- return name;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- public BridgeConfiguration getBridgeConfig()
- {
- return bridgeConfig;
- }
-
- public boolean isDuplicateDetection()
- {
- return duplicateDetection;
- }
-
- public List<Pair<String, String>> getStaticConnectorNamePairs()
- {
- return staticConnectorNamePairs;
- }
-
- public String getDiscoveryGroupName()
- {
- return discoveryGroupName;
- }
-
-}
Copied: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConnectionConfiguration.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,124 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.config.cluster;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A ClusterConnectionConfiguration
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 13 Jan 2009 09:42:17
+ *
+ *
+ */
+public class ClusterConnectionConfiguration implements Serializable
+{
+ private static final long serialVersionUID = 8948303813427795935L;
+
+ private final String name;
+
+ private final String address;
+
+ private final BridgeConfiguration bridgeConfig;
+
+ private final boolean duplicateDetection;
+
+ private final boolean forwardWhenNoConsumers;
+
+ private final List<Pair<String, String>> staticConnectorNamePairs;
+
+ private final String discoveryGroupName;
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final List<Pair<String, String>> staticConnectorNamePairs)
+ {
+ this.name = name;
+ this.address = address;
+ this.bridgeConfig = bridgeConfig;
+ this.staticConnectorNamePairs = staticConnectorNamePairs;
+ this.duplicateDetection = duplicateDetection;
+ this.forwardWhenNoConsumers = forwardWhenNoConsumers;
+ this.discoveryGroupName = null;
+ }
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final String discoveryGroupName)
+ {
+ this.name = name;
+ this.address = address;
+ this.bridgeConfig = bridgeConfig;
+ this.duplicateDetection = duplicateDetection;
+ this.forwardWhenNoConsumers = forwardWhenNoConsumers;
+ this.discoveryGroupName = discoveryGroupName;
+ this.staticConnectorNamePairs = null;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public BridgeConfiguration getBridgeConfig()
+ {
+ return bridgeConfig;
+ }
+
+ public boolean isDuplicateDetection()
+ {
+ return duplicateDetection;
+ }
+
+ public boolean isForwardWhenNoConsumers()
+ {
+ return forwardWhenNoConsumers;
+ }
+
+ public List<Pair<String, String>> getStaticConnectorNamePairs()
+ {
+ return staticConnectorNamePairs;
+ }
+
+ public String getDiscoveryGroupName()
+ {
+ return discoveryGroupName;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -24,7 +24,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
import org.jboss.messaging.core.config.cluster.QueueConfiguration;
@@ -165,7 +165,7 @@
protected List<DivertConfiguration> divertConfigurations = new ArrayList<DivertConfiguration>();
- protected List<ClusterConfiguration> clusterConfigurations = new ArrayList<ClusterConfiguration>();
+ protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<ClusterConnectionConfiguration>();
protected List<QueueConfiguration> queueConfigurations = new ArrayList<QueueConfiguration>();
@@ -363,12 +363,12 @@
this.broadcastGroupConfigurations = configs;
}
- public List<ClusterConfiguration> getClusterConfigurations()
+ public List<ClusterConnectionConfiguration> getClusterConfigurations()
{
return this.clusterConfigurations;
}
- public void setClusterConfigurations(final List<ClusterConfiguration> configs)
+ public void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs)
{
this.clusterConfigurations = configs;
}
Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -143,14 +143,14 @@
Object val = null;
if (id.getName().startsWith(JBM_PREFIX))
- {
+ {
// Look it up as header fields
val = getHeaderFieldValue(message, id.getName());
}
if (val == null)
- {
- val = message.getProperty(id.getName());
+ {
+ val = message.getProperty(id.getName());
}
id.setValue(val);
@@ -158,7 +158,7 @@
}
// Compute the result of this operator
-
+
boolean res = (Boolean)operator.apply();
return res;
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -32,7 +32,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -47,7 +47,7 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -103,7 +103,7 @@
void unregisterBridge(String name) throws Exception;
- void registerCluster(Cluster cluster, ClusterConfiguration configuration) throws Exception;
+ void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception;
void unregisterCluster(String name) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -166,6 +166,6 @@
TabularData getConnectors() throws Exception;
- void sendQueueInfoToQueue(SimpleString queueName) throws Exception;
+ void sendQueueInfoToQueue(String queueName) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -47,7 +47,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.AcceptorControlMBean;
@@ -77,7 +77,7 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -288,6 +288,7 @@
TypedProperties props = new TypedProperties();
+ log.info("registering queue with address "+ address);
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
@@ -431,7 +432,7 @@
unregisterFromJMX(objectName);
}
- public void registerCluster(final Cluster cluster, final ClusterConfiguration configuration) throws Exception
+ public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception
{
//TODO
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -54,7 +54,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.core.server.MessagingServer;
@@ -296,7 +296,7 @@
if (postOffice.getBinding(sName) == null)
{
Queue queue = queueFactory.createQueue(-1, sName, null, true, false);
- Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
+ Binding binding = new LocalQueueBinding(sAddress, queue);
storageManager.addQueueBinding(binding);
postOffice.addBinding(binding);
}
@@ -315,7 +315,7 @@
if (postOffice.getBinding(sName) == null)
{
Queue queue = queueFactory.createQueue(-1, sName, filter, durable, false);
- Binding binding = new BindingImpl(sAddress, sName, sName, queue, false, true);
+ Binding binding = new LocalQueueBinding(sAddress, queue);
if (durable)
{
storageManager.addQueueBinding(binding);
@@ -544,9 +544,9 @@
return TransportConfigurationInfo.toTabularData(connectorConfigurations);
}
- public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ public void sendQueueInfoToQueue(final String queueName) throws Exception
{
- postOffice.sendQueueInfoToQueue(queueName);
+ postOffice.sendQueueInfoToQueue(new SimpleString(queueName));
}
// NotificationEmitter implementation ----------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -239,7 +239,7 @@
return localControl.getConnectors();
}
- public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ public void sendQueueInfoToQueue(final String queueName) throws Exception
{
localControl.sendQueueInfoToQueue(queueName);
}
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -55,24 +55,20 @@
private static final Logger log = Logger.getLogger(MessageImpl.class);
- public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("JBM_ACTUAL_EXPIRY");
+ public static final SimpleString HDR_ACTUAL_EXPIRY_TIME = new SimpleString("_JBM_ACTUAL_EXPIRY");
- public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("JBM_ORIG_DESTINATION");
+ public static final SimpleString HDR_ORIGINAL_DESTINATION = new SimpleString("_JBM_ORIG_DESTINATION");
- public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("JBM_ORIG_MESSAGE_ID");
+ public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_JBM_ORIG_MESSAGE_ID");
- public static final SimpleString HDR_GROUP_ID = new SimpleString("JBM_GROUP_ID");
+ public static final SimpleString HDR_GROUP_ID = new SimpleString("_JBM_GROUP_ID");
- public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("JBM_SCHED_DELIVERY");
+ public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_JBM_SCHED_DELIVERY");
- public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
-
- public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
+ public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
+
+ public static final SimpleString HDR_EXCLUDED_QUEUES = new SimpleString("_JBM_EXCLUDED_QUEUES");
- public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
-
- public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("JBM_FROM_CLUSTER");
-
// Attributes ----------------------------------------------------
protected long messageID;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/AddressManager.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -37,7 +37,7 @@
{
boolean addBinding(Binding binding);
- Binding removeBinding(SimpleString queueName);
+ Binding removeBinding(SimpleString uniqueName);
Bindings getBindings(SimpleString address);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -45,7 +45,9 @@
SimpleString getRoutingName();
- boolean accept(ServerMessage message) throws Exception;
+ boolean filterMatches(ServerMessage message) throws Exception;
+
+ boolean isHighAcceptPriority(ServerMessage message);
boolean isExclusive();
}
Deleted: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,109 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.postoffice.impl;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A BindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 18:52:15
- *
- *
- */
-public class BindingImpl implements Binding
-{
- private final SimpleString address;
-
- private final SimpleString uniqueName;
-
- private final SimpleString routingName;
-
- protected final Bindable bindable;
-
- private final boolean exclusive;
-
- private final boolean isQueue;
-
- public BindingImpl(final SimpleString address,
- final SimpleString uniqueName,
- final SimpleString routingName,
- final Bindable bindable,
- final boolean exclusive,
- final boolean isQueue)
- {
- this.address = address;
-
- this.uniqueName = uniqueName;
-
- this.routingName = routingName;
-
- this.bindable = bindable;
-
- this.exclusive = exclusive;
-
- this.isQueue = isQueue;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public Bindable getBindable()
- {
- return bindable;
- }
-
- public boolean isQueueBinding()
- {
- return isQueue;
- }
-
- public boolean accept(final ServerMessage message) throws Exception
- {
- return bindable.accept(message);
- }
-
- public SimpleString getRoutingName()
- {
- return routingName;
- }
-
- public SimpleString getUniqueName()
- {
- return uniqueName;
- }
-
- public boolean isExclusive()
- {
- return exclusive;
- }
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,17 +48,17 @@
*
*/
public class BindingsImpl implements Bindings
-{
+{
private static final Logger log = Logger.getLogger(BindingsImpl.class);
private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<SimpleString, List<Binding>>();
-
+
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
-
+
private final List<Binding> bindingsList = new CopyOnWriteArrayList<Binding>();
-
+
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
-
+
public List<Binding> getBindings()
{
return bindingsList;
@@ -73,27 +73,27 @@
else
{
SimpleString routingName = binding.getRoutingName();
-
+
List<Binding> bindings = routingNameBindingMap.get(routingName);
-
+
if (bindings == null)
{
bindings = new CopyOnWriteArrayList<Binding>();
-
+
List<Binding> oldBindings = routingNameBindingMap.putIfAbsent(routingName, bindings);
-
+
if (oldBindings != null)
{
bindings = oldBindings;
}
}
-
+
bindings.add(binding);
}
-
+
bindingsList.add(binding);
}
-
+
public void removeBinding(final Binding binding)
{
if (binding.isExclusive())
@@ -103,33 +103,33 @@
else
{
SimpleString routingName = binding.getRoutingName();
-
+
List<Binding> bindings = routingNameBindingMap.get(routingName);
-
+
if (bindings != null)
{
bindings.remove(binding);
-
+
if (bindings.isEmpty())
{
routingNameBindingMap.remove(routingName);
}
}
}
-
+
bindingsList.remove(binding);
}
-
+
public void route(ServerMessage message) throws Exception
{
route(message, null);
}
-
+
public void route(ServerMessage message, Transaction tx) throws Exception
{
if (!exclusiveBindings.isEmpty())
{
- for (Binding binding: exclusiveBindings)
+ for (Binding binding : exclusiveBindings)
{
binding.getBindable().route(message, tx);
}
@@ -137,42 +137,46 @@
else
{
Set<Bindable> chosen = new HashSet<Bindable>();
-
- for (Map.Entry<SimpleString, List<Binding>> entry: routingNameBindingMap.entrySet())
+
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
-
+
List<Binding> bindings = entry.getValue();
-
+
if (bindings == null)
{
- //The value can become null if it's concurrently removed while we're iterating - this is expected ConcurrentHashMap behaviour!
+ // The value can become null if it's concurrently removed while we're iterating - this is expected
+ // ConcurrentHashMap behaviour!
continue;
}
-
+
Integer ipos = routingNamePositions.get(routingName);
-
+
int pos = ipos != null ? ipos.intValue() : 0;
-
+
+ int length = bindings.size();
+
int startPos = pos;
-
- int length = bindings.size();
-
- do
- {
+
+ Binding theBinding = null;
+
+ int lastNoMatchingConsumerPos = -1;
+
+ while (true)
+ {
Binding binding;
-
try
{
binding = bindings.get(pos);
}
catch (IndexOutOfBoundsException e)
{
- //This can occur if binding is removed while in route
+ // This can occur if binding is removed while in route
if (!bindings.isEmpty())
{
pos = 0;
-
+
continue;
}
else
@@ -180,34 +184,94 @@
break;
}
}
-
- pos++;
-
- if (pos == length)
+
+ if (binding.filterMatches(message))
{
- pos = 0;
+ // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+ // unnecessary overhead)
+ if (length == 1 || binding.isHighAcceptPriority(message))
+ {
+ theBinding = binding;
+
+ pos = incrementPos(pos, length);
+
+ break;
+ }
+ else
+ {
+ lastNoMatchingConsumerPos = pos;
+ }
}
-
- if (binding.accept(message))
+
+ pos = incrementPos(pos, length);
+
+ if (pos == startPos)
{
- chosen.add(binding.getBindable());
-
+ if (lastNoMatchingConsumerPos != -1)
+ {
+ try
+ {
+ theBinding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+
+ lastNoMatchingConsumerPos = -1;
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ pos = lastNoMatchingConsumerPos;
+
+ pos = incrementPos(pos, length);
+ }
break;
}
}
- while (startPos != pos);
-
- if (pos != startPos)
+
+ if (theBinding != null)
{
- routingNamePositions.put(routingName, pos);
+ chosen.add(theBinding.getBindable());
}
+
+ routingNamePositions.put(routingName, pos);
+
}
-
- for (Bindable bindable: chosen)
+
+ //TODO refactor to do this is one iteration
+
+ for (Bindable bindable : chosen)
{
+ bindable.preroute(message, tx);
+ }
+
+ for (Bindable bindable : chosen)
+ {
bindable.route(message, tx);
}
- }
+ }
+
}
-
+
+ private final int incrementPos(int pos, int length)
+ {
+ pos++;
+
+ if (pos == length)
+ {
+ pos = 0;
+ }
+
+ return pos;
+ }
+
}
Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LocalQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 28 Jan 2009 12:42:23
+ *
+ *
+ */
+public class DivertBinding implements Binding
+{
+ private final SimpleString address;
+
+ private final Divert divert;
+
+ private final Filter filter;
+
+ private final SimpleString uniqueName;
+
+ private final SimpleString routingName;
+
+ private final boolean exclusive;
+
+ public DivertBinding(final SimpleString address, final Divert divert)
+ {
+ this.address = address;
+
+ this.divert = divert;
+
+ this.filter = divert.getFilter();
+
+ this.uniqueName = divert.getUniqueName();
+
+ this.routingName = divert.getRoutingName();
+
+ this.exclusive = divert.isExclusive();
+ }
+
+ public boolean filterMatches(final ServerMessage message) throws Exception
+ {
+ if (filter != null && !filter.match(message))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public Bindable getBindable()
+ {
+ return divert;
+ }
+
+ public SimpleString getRoutingName()
+ {
+ return routingName;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public boolean isHighAcceptPriority(final ServerMessage message)
+ {
+ return true;
+ }
+
+ public boolean isQueueBinding()
+ {
+ return false;
+ }
+
+}
+
Added: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,134 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.postoffice.impl;
+
+import java.util.List;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LocalQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 28 Jan 2009 12:42:23
+ *
+ *
+ */
+public class LocalQueueBinding implements Binding
+{
+ private final SimpleString address;
+
+ private final Queue queue;
+
+ private final Filter filter;
+
+ private final SimpleString name;
+
+ public LocalQueueBinding(final SimpleString address, final Queue queue)
+ {
+ this.address = address;
+
+ this.queue = queue;
+
+ this.filter = queue.getFilter();
+
+ this.name = queue.getName();
+ }
+
+ public boolean filterMatches(final ServerMessage message) throws Exception
+ {
+ if (filter != null && !filter.match(message))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public Bindable getBindable()
+ {
+ return queue;
+ }
+
+ public SimpleString getRoutingName()
+ {
+ return name;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return name;
+ }
+
+ public boolean isExclusive()
+ {
+ return false;
+ }
+
+ public boolean isHighAcceptPriority(final ServerMessage message)
+ {
+ //It's a high accept priority if the queue has at least one matching consumer
+
+ List<Consumer> consumers = queue.getConsumers();
+
+ for (Consumer consumer: consumers)
+ {
+ Filter filter = consumer.getFilter();
+
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ if (filter.match(message))
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public boolean isQueueBinding()
+ {
+ return true;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -80,7 +80,9 @@
public class PostOfficeImpl implements PostOffice, NotificationListener
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
-
+
+ public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
+
private static final List<MessageReference> emptyList = Collections.<MessageReference> emptyList();
private final AddressManager addressManager;
@@ -300,7 +302,7 @@
message.setDestination(queueName);
- message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(NotificationType.QUEUE_CREATED.toString()));
+ message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
return message;
@@ -327,19 +329,20 @@
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
message.setDestination(queueName);
- message.putBooleanProperty(MessageImpl.HDR_RESET_QUEUE_DATA, true);
+ message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
- queue.accept(message);
+ queue.preroute(message, null);
queue.route(message, null);
for (QueueInfo info: queueInfos.values())
{
+ log.info("creatign queue created message");
message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
- queue.accept(message);
+ queue.preroute(message, null);
queue.route(message, null);
int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
@@ -350,7 +353,7 @@
message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
- queue.accept(message);
+ queue.preroute(message, null);
queue.route(message, null);
}
@@ -363,7 +366,7 @@
message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
- queue.accept(message);
+ queue.preroute(message, null);
queue.route(message, null);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/SimpleAddressManager.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -57,16 +57,17 @@
return addMappingInternal(binding.getAddress(), binding);
}
- public Binding removeBinding(final SimpleString bindableName)
+ public Binding removeBinding(final SimpleString uniqueName)
{
- Binding binding = nameMap.remove(bindableName);
+ Binding binding = nameMap.remove(uniqueName);
if (binding == null)
{
- throw new IllegalStateException("Queue is not bound " + bindableName);
+ throw new IllegalStateException("Queue is not bound " + uniqueName);
}
- removeBindingInternal(binding.getAddress(), bindableName);
+ removeBindingInternal(binding.getAddress(), uniqueName);
+
return binding;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -125,12 +125,12 @@
* If the address is a wild card then the binding will be removed from the actual mappings for any linked address.
* otherwise it will be removed as normal.
*
- * @param bindableName the name of the queue for the binding to remove
+ * @param uniqueName the name of the binding to remove
* @return true if this was the last mapping for a specific address
*/
- public Binding removeBinding(final SimpleString bindableName)
+ public Binding removeBinding(final SimpleString uniqueName)
{
- Binding binding = super.removeBinding(bindableName);
+ Binding binding = super.removeBinding(uniqueName);
if (binding != null)
{
Address add = getAddress(binding.getAddress());
@@ -152,7 +152,7 @@
{
for (Address destination : add.getLinkedAddresses())
{
- super.removeBindingInternal(destination.getAddress(), bindableName);
+ super.removeBindingInternal(destination.getAddress(), uniqueName);
}
}
removeAndUpdateAddressMap(add);
Modified: trunk/src/main/org/jboss/messaging/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Bindable.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -35,8 +35,8 @@
*/
public interface Bindable
{
+ void preroute(ServerMessage message, Transaction tx) throws Exception;
+
void route(ServerMessage message, Transaction tx) throws Exception;
-
- boolean accept(ServerMessage message) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Consumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Consumer.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Consumer.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -22,7 +22,7 @@
package org.jboss.messaging.core.server;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.filter.Filter;
@@ -37,5 +37,5 @@
{
HandleStatus handle(MessageReference reference) throws Exception;
- SimpleString getFilterString();
+ Filter getFilter();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Divert.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Divert.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/Divert.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -23,8 +23,11 @@
package org.jboss.messaging.core.server;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.util.SimpleString;
+
/**
* A Divert
*
@@ -36,5 +39,11 @@
*/
public interface Divert extends Bindable
{
- //boolean isExclusive();
+ Filter getFilter();
+
+ boolean isExclusive();
+
+ SimpleString getUniqueName();
+
+ SimpleString getRoutingName();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Bridge.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -48,8 +48,6 @@
long getMaxBatchTime();
- SimpleString getFilterString();
-
SimpleString getForwardingAddress();
Transformer getTransformer();
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,41 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A Cluster
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 23 Jan 2009 14:51:55
- *
- *
- */
-public interface Cluster extends MessagingComponent
-{
- SimpleString getName();
-}
Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A ClusterConnection
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 23 Jan 2009 14:51:55
+ *
+ *
+ */
+public interface ClusterConnection extends MessagingComponent
+{
+ SimpleString getName();
+}
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,44 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBinding
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 23 Jan 2009 11:58:05
- *
- *
- */
-public interface FlowBinding extends Binding
-{
- void addConsumer(SimpleString filterString) throws Exception;
-
- void removeConsumer(SimpleString filterString) throws Exception;
-
-}
Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java (from rev 5750, trunk/src/main/org/jboss/messaging/core/server/cluster/FlowBinding.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/RemoteQueueBinding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemoteQueueBinding
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 23 Jan 2009 11:58:05
+ *
+ *
+ */
+public interface RemoteQueueBinding extends Binding
+{
+ void addConsumer(SimpleString filterString) throws Exception;
+
+ void removeConsumer(SimpleString filterString) throws Exception;
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -24,11 +24,7 @@
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +40,7 @@
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -51,7 +48,6 @@
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.QueueInfo;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.server.HandleStatus;
@@ -65,7 +61,6 @@
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
import org.jboss.messaging.util.UUIDGenerator;
/**
@@ -168,6 +163,7 @@
final MessageHandler queueInfoMessageHandler,
final String queueDataAddress) throws Exception
{
+ log.info("Creating new bridge " + name + " queue " + queue);
this.name = name;
this.queue = queue;
@@ -209,6 +205,8 @@
this.queueInfoMessageHandler = queueInfoMessageHandler;
+ log.info("queue info handler " + this.queueInfoMessageHandler);
+
this.queueDataAddress = queueDataAddress;
if (maxBatchTime != -1)
@@ -242,6 +240,7 @@
{
try
{
+ log.info("creating objects");
createTx();
queue.addConsumer(BridgeImpl.this);
@@ -263,22 +262,23 @@
{
// Get the queue data
- SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+ SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance().generateSimpleStringUUID());
- SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "' AND " +
+ SimpleString filter = new SimpleString(
ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
"'" +
NotificationType.QUEUE_CREATED +
+ "'," +
"'" +
- "'" +
NotificationType.QUEUE_DESTROYED +
+ "'," +
"'" +
- "'" +
NotificationType.CONSUMER_CREATED +
+ "'," +
"'" +
- "'" +
NotificationType.CONSUMER_CLOSED +
- "'");
+ "') AND " +
+ ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "%'");
session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
@@ -293,8 +293,14 @@
ManagementHelper.putOperationInvocation(message,
ManagementServiceImpl.getMessagingServerObjectName(),
"sendQueueInfoToQueue",
- notifQueueName);
+ notifQueueName.toString());
+
+ ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
+
+ prod.send(message);
}
+
+ log.info("Created objects");
active = true;
@@ -313,6 +319,11 @@
public synchronized void stop() throws Exception
{
+ if (!started)
+ {
+ return;
+ }
+
started = false;
active = false;
@@ -465,6 +476,8 @@
{
return;
}
+
+ log.info("sending batch");
// TODO - if batch size = 1 then don't need tx
@@ -564,9 +577,9 @@
return maxBatchTime;
}
- public SimpleString getFilterString()
+ public Filter getFilter()
{
- return filterString;
+ return filter;
}
public SimpleString getForwardingAddress()
Copied: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java (from rev 5744, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,505 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import static org.jboss.messaging.core.postoffice.impl.PostOfficeImpl.HDR_RESET_QUEUE_DATA;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.NotificationType;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
+
+/**
+ *
+ * A ClusterConnectionImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 14:43:05
+ *
+ *
+ */
+public class ClusterConnectionImpl implements ClusterConnection, DiscoveryListener
+{
+ private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
+
+ private final ExecutorFactory executorFactory;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final SimpleString name;
+
+ private final SimpleString address;
+
+ private final BridgeConfiguration bridgeConfig;
+
+ private final boolean useDuplicateDetection;
+
+ private final boolean forwardWhenNoMatchingConsumers;
+
+ private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
+
+ private final DiscoveryGroup discoveryGroup;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final QueueFactory queueFactory;
+
+ private volatile boolean started;
+
+ /*
+ * Constructor using static list of connectors
+ */
+ public ClusterConnectionImpl(final SimpleString name,
+ final SimpleString address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean useDuplicateDetection,
+ final boolean forwardWhenNoMatchingConsumers,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final ScheduledExecutorService scheduledExecutor,
+ final QueueFactory queueFactory,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.bridgeConfig = bridgeConfig;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.discoveryGroup = null;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.queueFactory = queueFactory;
+
+ this.updateConnectors(connectors);
+ }
+
+ /*
+ * Constructor using discovery to get connectors
+ */
+ public ClusterConnectionImpl(final SimpleString name,
+ final SimpleString address,
+ final BridgeConfiguration bridgeConfig,
+ final boolean useDuplicateDetection,
+ final boolean forwardWhenNoMatchingConsumers,
+ final ExecutorFactory executorFactory,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final ScheduledExecutorService scheduledExecutor,
+ final QueueFactory queueFactory,
+ final DiscoveryGroup discoveryGroup) throws Exception
+ {
+ this.name = name;
+
+ this.address = address;
+
+ this.bridgeConfig = bridgeConfig;
+
+ this.executorFactory = executorFactory;
+
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
+ this.scheduledExecutor = scheduledExecutor;
+
+ this.queueFactory = queueFactory;
+
+ this.discoveryGroup = discoveryGroup;
+
+ this.useDuplicateDetection = useDuplicateDetection;
+
+ this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ updateConnectors(discoveryGroup.getConnectors());
+
+ discoveryGroup.registerListener(this);
+ }
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ log.info("Stoping cluster connection");
+
+ if (!started)
+ {
+ return;
+ }
+
+ if (discoveryGroup != null)
+ {
+ discoveryGroup.unregisterListener(this);
+ }
+
+ log.info("Three are " + records.size() + " records");
+
+ for (MessageFlowRecord record : records.values())
+ {
+ log.info("stopping record");
+ record.close();
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ // DiscoveryListener implementation ------------------------------------------------------------------
+
+ public void connectorsChanged()
+ {
+ try
+ {
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
+
+ updateConnectors(connectors);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to update connectors", e);
+ }
+ }
+
+ private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
+
+ connectorSet.addAll(connectors);
+
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
+ .iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
+
+ if (!connectorSet.contains(entry.getKey()))
+ {
+ // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
+ // have messages - this is up to the admininstrator to do this
+
+ entry.getValue().close();
+
+ iter.remove();
+ }
+ }
+
+ for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
+ {
+ if (!records.containsKey(connectorPair))
+ {
+ SimpleString queueName = generateQueueName(name, connectorPair);
+
+ Binding queueBinding = postOffice.getBinding(queueName);
+
+ Queue queue;
+
+ if (queueBinding != null)
+ {
+ queue = (Queue)queueBinding.getBindable();
+ }
+ else
+ {
+ queue = queueFactory.createQueue(-1, name, null, true, false);
+
+ // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
+ // actually routed to at that address though
+
+ Binding storeBinding = new LocalQueueBinding(queue.getName(), queue);
+
+ storageManager.addQueueBinding(storeBinding);
+ }
+
+ MessageFlowRecord record = new MessageFlowRecord(queue);
+
+ Bridge bridge = new BridgeImpl(queueName,
+ queue,
+ connectorPair,
+ executorFactory.getExecutor(),
+ bridgeConfig.getMaxBatchSize(),
+ bridgeConfig.getMaxBatchTime(),
+ bridgeConfig.getFilterString() == null ? null
+ : new SimpleString(bridgeConfig.getFilterString()),
+ null,
+ storageManager,
+ scheduledExecutor,
+ null,
+ bridgeConfig.getRetryInterval(),
+ bridgeConfig.getRetryIntervalMultiplier(),
+ bridgeConfig.getMaxRetriesBeforeFailover(),
+ bridgeConfig.getMaxRetriesAfterFailover(),
+ false,
+ record,
+ address.toString());
+
+ record.setBridge(bridge);
+
+ log.info("added record");
+ records.put(connectorPair, record);
+
+ bridge.start();
+ }
+ }
+ }
+
+ private SimpleString generateQueueName(final SimpleString clusterName,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
+ {
+ return new SimpleString("cluster." + name +
+ "." +
+ generateConnectorString(connectorPair.a) +
+ "-" +
+ (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
+ }
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
+
+ private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+ log.info("config is " + config);
+
+ if (config.getParams() != null)
+ {
+ if (!config.getParams().isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+ }
+
+ return new SimpleString(str.toString());
+ }
+
+ // Inner classes -----------------------------------------------------------------------------------
+
+ private class MessageFlowRecord implements MessageHandler
+ {
+ private Bridge bridge;
+
+ private final Queue queue;
+
+ private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
+
+ private boolean firstReset = false;
+
+ public MessageFlowRecord(final Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public void close() throws Exception
+ {
+ log.info("stopping bridge");
+ bridge.stop();
+ log.info("stopped bridge");
+
+ for (RemoteQueueBinding binding : bindings.values())
+ {
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+ }
+
+ public void setBridge(final Bridge bridge)
+ {
+ this.bridge = bridge;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ // Reset the bindings
+ if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
+ {
+ for (RemoteQueueBinding binding : bindings.values())
+ {
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+
+ bindings.clear();
+
+ firstReset = true;
+
+ log.info("did reset");
+
+ return;
+ }
+
+ if (!firstReset)
+ {
+ return;
+ }
+
+ NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
+ .toString());
+
+ log.info("Got notification message " + type);
+
+ if (type == NotificationType.QUEUE_CREATED)
+ {
+ log.info("queue created");
+ SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
+ .generateSimpleStringUUID());
+
+ SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
+
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+ uniqueName,
+ queueName,
+ filterString,
+ queue,
+ useDuplicateDetection,
+ forwardWhenNoMatchingConsumers);
+
+ bindings.put(queueName, binding);
+
+ postOffice.addBinding(binding);
+ }
+ else if (type == NotificationType.QUEUE_DESTROYED)
+ {
+ log.info("queue destroyed");
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ RemoteQueueBinding binding = bindings.remove(queueName);
+
+ postOffice.removeBinding(binding.getUniqueName());
+ }
+ else if (type == NotificationType.CONSUMER_CREATED)
+ {
+ log.info("consumer created");
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ RemoteQueueBinding binding = bindings.get(queueName);
+
+ binding.addConsumer(filterString);
+ }
+ else if (type == NotificationType.CONSUMER_CLOSED)
+ {
+ log.info("consumer closed");
+ SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
+
+ SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ RemoteQueueBinding binding = bindings.get(queueName);
+
+ binding.removeConsumer(filterString);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle message", e);
+ }
+ }
+
+ }
+
+}
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,470 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_RESET_QUEUE_DATA;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.management.impl.ManagementHelper;
-import org.jboss.messaging.core.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.cluster.DiscoveryListener;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.management.NotificationType;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.cluster.Bridge;
-import org.jboss.messaging.core.server.cluster.Cluster;
-import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.UUIDGenerator;
-
-/**
- *
- * A ClusterImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 14:43:05
- *
- *
- */
-public class ClusterImpl implements Cluster, DiscoveryListener
-{
- private static final Logger log = Logger.getLogger(ClusterImpl.class);
-
- private final ExecutorFactory executorFactory;
-
- private final StorageManager storageManager;
-
- private final PostOffice postOffice;
-
- private final SimpleString name;
-
- private final SimpleString address;
-
- private final BridgeConfiguration bridgeConfig;
-
- private final boolean useDuplicateDetection;
-
- private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
-
- private final DiscoveryGroup discoveryGroup;
-
- private final ScheduledExecutorService scheduledExecutor;
-
- private final QueueFactory queueFactory;
-
- private volatile boolean started;
-
- /*
- * Constructor using static list of connectors
- */
- public ClusterImpl(final SimpleString name,
- final SimpleString address,
- final BridgeConfiguration bridgeConfig,
- final boolean useDuplicateDetection,
- final ExecutorFactory executorFactory,
- final StorageManager storageManager,
- final PostOffice postOffice,
- final ScheduledExecutorService scheduledExecutor,
- final QueueFactory queueFactory,
- final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
- {
- this.name = name;
-
- this.address = address;
-
- this.bridgeConfig = bridgeConfig;
-
- this.useDuplicateDetection = useDuplicateDetection;
-
- this.executorFactory = executorFactory;
-
- this.storageManager = storageManager;
-
- this.postOffice = postOffice;
-
- this.discoveryGroup = null;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.queueFactory = queueFactory;
-
- this.updateConnectors(connectors);
- }
-
- /*
- * Constructor using discovery to get connectors
- */
- public ClusterImpl(final SimpleString name,
- final SimpleString address,
- final BridgeConfiguration bridgeConfig,
- final boolean useDuplicateDetection,
- final ExecutorFactory executorFactory,
- final StorageManager storageManager,
- final PostOffice postOffice,
- final ScheduledExecutorService scheduledExecutor,
- final QueueFactory queueFactory,
- final DiscoveryGroup discoveryGroup) throws Exception
- {
- this.name = name;
-
- this.address = address;
-
- this.bridgeConfig = bridgeConfig;
-
- this.executorFactory = executorFactory;
-
- this.storageManager = storageManager;
-
- this.postOffice = postOffice;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.queueFactory = queueFactory;
-
- this.discoveryGroup = discoveryGroup;
-
- this.useDuplicateDetection = useDuplicateDetection;
- }
-
- public synchronized void start() throws Exception
- {
- if (started)
- {
- return;
- }
-
- if (discoveryGroup != null)
- {
- updateConnectors(discoveryGroup.getConnectors());
-
- discoveryGroup.registerListener(this);
- }
-
- started = true;
- }
-
- public synchronized void stop() throws Exception
- {
- if (!started)
- {
- return;
- }
-
- if (discoveryGroup != null)
- {
- discoveryGroup.unregisterListener(this);
- }
-
- for (MessageFlowRecord record : records.values())
- {
- record.close();
- }
-
- started = false;
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
- public SimpleString getName()
- {
- return name;
- }
-
- // DiscoveryListener implementation ------------------------------------------------------------------
-
- public void connectorsChanged()
- {
- try
- {
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = discoveryGroup.getConnectors();
-
- updateConnectors(connectors);
- }
- catch (Exception e)
- {
- log.error("Failed to update connectors", e);
- }
- }
-
- private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
- {
- Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
-
- connectorSet.addAll(connectors);
-
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>> iter = records.entrySet()
- .iterator();
-
- while (iter.hasNext())
- {
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> entry = iter.next();
-
- if (!connectorSet.contains(entry.getKey()))
- {
- // Connector no longer there - we should remove and close it - we don't delete the queue though - it may
- // have messages - this is up to the admininstrator to do this
-
- entry.getValue().close();
-
- iter.remove();
- }
- }
-
- for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectors)
- {
- if (!records.containsKey(connectorPair))
- {
- SimpleString queueName = generateQueueName(name, connectorPair);
-
- Binding queueBinding = postOffice.getBinding(queueName);
-
- Queue queue;
-
- if (queueBinding != null)
- {
- queue = (Queue)queueBinding.getBindable();
- }
- else
- {
- queue = queueFactory.createQueue(-1, name, null, true, false);
-
- // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
- // actually routed to at that address though
-
- Binding storeBinding = new BindingImpl(queue.getName(),
- queue.getName(),
- queue.getName(),
- queue,
- false,
- true);
-
- storageManager.addQueueBinding(storeBinding);
- }
-
- MessageFlowRecord record = new MessageFlowRecord(queue);
-
- Bridge bridge = new BridgeImpl(queueName,
- queue,
- connectorPair,
- executorFactory.getExecutor(),
- bridgeConfig.getMaxBatchSize(),
- bridgeConfig.getMaxBatchTime(),
- new SimpleString(bridgeConfig.getFilterString()),
- null,
- storageManager,
- scheduledExecutor,
- null,
- bridgeConfig.getRetryInterval(),
- bridgeConfig.getRetryIntervalMultiplier(),
- bridgeConfig.getMaxRetriesBeforeFailover(),
- bridgeConfig.getMaxRetriesAfterFailover(),
- false,
- record,
- address.toString());
-
- record.setBridge(bridge);
-
- records.put(connectorPair, record);
-
- bridge.start();
- }
- }
- }
-
- private SimpleString generateQueueName(final SimpleString clusterName,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair) throws Exception
- {
- return new SimpleString("cluster." + name +
- "." +
- generateConnectorString(connectorPair.a) +
- "-" +
- (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
- }
-
- private String replaceWildcardChars(final String str)
- {
- return str.replace('.', '-');
- }
-
- private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
- {
- StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
- if (!config.getParams().isEmpty())
- {
- str.append("?");
- }
-
- boolean first = true;
- for (Map.Entry<String, Object> entry : config.getParams().entrySet())
- {
- if (!first)
- {
- str.append("&");
- }
- String encodedKey = replaceWildcardChars(entry.getKey());
-
- String val = entry.getValue().toString();
- String encodedVal = replaceWildcardChars(val);
-
- str.append(encodedKey).append('=').append(encodedVal);
-
- first = false;
- }
-
- return new SimpleString(str.toString());
- }
-
- // Inner classes -----------------------------------------------------------------------------------
-
- private class MessageFlowRecord implements MessageHandler
- {
- private Bridge bridge;
-
- private final Queue queue;
-
- private final Map<SimpleString, FlowBinding> bindings = new HashMap<SimpleString, FlowBinding>();
-
- private boolean firstReset = false;
-
- public MessageFlowRecord(final Queue queue)
- {
- this.queue = queue;
- }
-
- public void close() throws Exception
- {
- bridge.stop();
-
- for (FlowBinding binding : bindings.values())
- {
- postOffice.removeBinding(binding.getUniqueName());
- }
- }
-
- public void setBridge(final Bridge bridge)
- {
- this.bridge = bridge;
- }
-
- public void onMessage(final ClientMessage message)
- {
- try
- {
- // Reset the bindings
- if (message.getProperty(HDR_RESET_QUEUE_DATA) != null)
- {
- for (FlowBinding binding : bindings.values())
- {
- postOffice.removeBinding(binding.getUniqueName());
- }
-
- bindings.clear();
-
- firstReset = true;
- }
-
- if (!firstReset)
- {
- return;
- }
-
- NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
- .toString());
-
- if (type == NotificationType.QUEUE_CREATED)
- {
- SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
- .generateSimpleStringUUID());
-
- SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
-
- SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
- FlowBinding binding = new FlowBindingImpl(queueAddress, uniqueName, queueName, queue, useDuplicateDetection);
-
- bindings.put(queueName, binding);
-
- postOffice.addBinding(binding);
- }
- else if (type == NotificationType.QUEUE_DESTROYED)
- {
- SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
- FlowBinding binding = bindings.remove(queueName);
-
- postOffice.removeBinding(binding.getUniqueName());
- }
- else if (type == NotificationType.CONSUMER_CREATED)
- {
- SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
- SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
- FlowBinding binding = bindings.get(queueName);
-
- binding.addConsumer(filterString);
- }
- else if (type == NotificationType.CONSUMER_CLOSED)
- {
- SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
-
- SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
-
- FlowBinding binding = bindings.get(queueName);
-
- binding.removeConsumer(filterString);
- }
- }
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
- }
- }
-
- }
-
-
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -38,7 +38,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
-import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
@@ -49,7 +49,7 @@
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.Cluster;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.server.cluster.ClusterManager;
import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.util.ExecutorFactory;
@@ -74,9 +74,9 @@
private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
-
- private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
+ private final Map<String, ClusterConnection> clusters = new HashMap<String, ClusterConnection>();
+
private final ExecutorFactory executorFactory;
private final StorageManager storageManager;
@@ -88,7 +88,7 @@
private final ManagementService managementService;
private final Configuration configuration;
-
+
private final QueueFactory queueFactory;
private volatile boolean started;
@@ -112,7 +112,7 @@
this.managementService = managementService;
this.configuration = configuration;
-
+
this.queueFactory = queueFactory;
}
@@ -137,10 +137,10 @@
{
deployBridge(config);
}
-
- for (ClusterConfiguration config: configuration.getClusterConfigurations())
+
+ for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
{
- deployCluster(config);
+ deployClusterConnection(config);
}
started = true;
@@ -170,11 +170,12 @@
bridge.stop();
managementService.unregisterBridge(bridge.getName().toString());
}
-
- for (Cluster cluster: clusters.values())
+
+ log.info("stopping cluster connecttions");
+ for (ClusterConnection clusterConnection : clusters.values())
{
- cluster.stop();
- managementService.unregisterCluster(cluster.getName().toString());
+ clusterConnection.stop();
+ managementService.unregisterCluster(clusterConnection.getName().toString());
}
broadcastGroups.clear();
@@ -407,8 +408,8 @@
bridge.start();
}
}
-
- private synchronized void deployCluster(final ClusterConfiguration config) throws Exception
+
+ private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
{
@@ -423,80 +424,86 @@
return;
}
-
- Cluster cluster;
-
+
+ ClusterConnection clusterConnection;
+
List<Pair<TransportConfiguration, TransportConfiguration>> connectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-
+
if (config.getStaticConnectorNamePairs() != null)
{
- for (Pair<String, String> connectorNamePair: config.getStaticConnectorNamePairs())
- {
+ for (Pair<String, String> connectorNamePair : config.getStaticConnectorNamePairs())
+ {
TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
-
+
if (connector == null)
{
- log.warn("No connector defined with name '" + connectorNamePair.a + "'. The bridge will not be deployed.");
-
+ log.warn("No connector defined with name '" + connectorNamePair.a +
+ "'. The bridge will not be deployed.");
+
return;
}
-
+
TransportConfiguration backupConnector = null;
-
+
if (connectorNamePair.b != null)
{
backupConnector = configuration.getConnectorConfigurations().get(connectorNamePair.b);
-
+
if (backupConnector == null)
{
log.warn("No connector defined with name '" + connectorNamePair.b +
"'. The bridge will not be deployed.");
-
+
return;
}
}
-
+
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
backupConnector);
-
+
connectors.add(pair);
}
-
- cluster = new ClusterImpl(new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getBridgeConfig(),
- config.isDuplicateDetection(),
- executorFactory,
- storageManager,
- postOffice,
- scheduledExecutor,
- queueFactory,
- connectors);
+
+ clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getBridgeConfig(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ executorFactory,
+ storageManager,
+ postOffice,
+ scheduledExecutor,
+ queueFactory,
+ connectors);
}
else
{
DiscoveryGroup dg = discoveryGroups.get(config.getDiscoveryGroupName());
-
+
if (dg == null)
{
- log.warn("No discovery group with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+ log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ "'. The bridge will not be deployed.");
}
-
- cluster = new ClusterImpl(new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getBridgeConfig(),
- config.isDuplicateDetection(),
- executorFactory,
- storageManager,
- postOffice,
- scheduledExecutor,
- queueFactory,
- dg);
+
+ clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getBridgeConfig(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ executorFactory,
+ storageManager,
+ postOffice,
+ scheduledExecutor,
+ queueFactory,
+ dg);
}
- managementService.registerCluster(cluster, config);
+ managementService.registerCluster(clusterConnection, config);
- clusters.put(config.getName(), cluster);
+ clusters.put(config.getName(), clusterConnection);
+
+ clusterConnection.start();
}
private Transformer instantiateTransformer(final String transformerClassName)
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -1,183 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_FROM_CLUSTER;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.filter.impl.FilterImpl;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
-import org.jboss.messaging.core.server.Bindable;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A FlowBindingImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 21 Jan 2009 18:55:22
- *
- *
- */
-public class FlowBindingImpl extends BindingImpl implements FlowBinding
-{
- private final Set<Filter> filters = new HashSet<Filter>();
-
- private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
-
- private int consumerCount;
-
- private final boolean duplicateDetection;
-
- private final SimpleString headerName;
-
- public FlowBindingImpl(final SimpleString address,
- final SimpleString uniqueName,
- final SimpleString routingName,
- final Bindable bindable,
- final boolean duplicateDetection)
- {
- super(address, uniqueName, routingName, bindable, false, false);
-
- this.duplicateDetection = duplicateDetection;
-
- headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(routingName);
- }
-
- public boolean accept(final ServerMessage message) throws Exception
- {
- if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
- {
- return false;
- }
-
- if (consumerCount == 0)
- {
- return false;
- }
-
- boolean accepted = false;
-
- if (filters.isEmpty())
- {
- accepted = true;
- }
- else
- {
- for (Filter filter : filters)
- {
- if (filter.match(message))
- {
- accepted = true;
-
- break;
- }
- }
- }
-
- if (duplicateDetection && accepted)
- {
- if (!message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID))
- {
- //Add the message id as a duplicate id header - this will be detected when routing on the remote node -
- //any duplicates will be rejected
- byte[] bytes = new byte[8];
-
- ByteBuffer buff = ByteBuffer.wrap(bytes);
-
- buff.putLong(message.getMessageID());
-
- SimpleString dupID = new SimpleString(bytes);
-
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
- }
- }
-
-
- message.putBooleanProperty(headerName, Boolean.valueOf(true));
-
- message.putBooleanProperty(HDR_FROM_CLUSTER, Boolean.valueOf(true));
-
- return accepted;
- }
-
- public synchronized void addConsumer(final SimpleString filterString) throws Exception
- {
- if (filterString != null)
- {
- // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
- // count
-
- Integer i = filterCounts.get(filterString);
-
- if (i == null)
- {
- filterCounts.put(filterString, 0);
-
- filters.add(new FilterImpl(filterString));
- }
- else
- {
- filterCounts.put(filterString, i + 1);
- }
- }
-
- consumerCount++;
- }
-
- public synchronized void removeConsumer(final SimpleString filterString) throws Exception
- {
- if (filterString != null)
- {
- Integer i = filterCounts.get(filterString);
-
- if (i != null)
- {
- int ii = i - 1;
-
- if (ii == 0)
- {
- filterCounts.remove(filterString);
-
- filters.remove(filterString);
- }
- else
- {
- filterCounts.put(filterString, ii);
- }
- }
- }
-
- consumerCount--;
- }
-
-}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.cluster.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RemoteQueueBindingImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 21 Jan 2009 18:55:22
+ *
+ *
+ */
+public class RemoteQueueBindingImpl implements RemoteQueueBinding
+{
+ private static final Logger log = Logger.getLogger(RemoteQueueBindingImpl.class);
+
+ private final SimpleString address;
+
+ private final Queue storeAndForwardQueue;
+
+ private final SimpleString uniqueName;
+
+ private final SimpleString routingName;
+
+ private final Filter queueFilter;
+
+ private final Set<Filter> filters = new HashSet<Filter>();
+
+ private final Map<SimpleString, Integer> filterCounts = new HashMap<SimpleString, Integer>();
+
+ private int consumerCount;
+
+ private final boolean duplicateDetection;
+
+ private final boolean forwardWhenNoMatchingConsumers;
+
+ public RemoteQueueBindingImpl(final SimpleString address,
+ final SimpleString uniqueName,
+ final SimpleString routingName,
+ final SimpleString filterString,
+ final Queue storeAndForwardQueue,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoMatchingConsumers) throws Exception
+ {
+ this.address = address;
+
+ this.storeAndForwardQueue = storeAndForwardQueue;
+
+ this.uniqueName = uniqueName;
+
+ this.routingName = routingName;
+
+ this.duplicateDetection = duplicateDetection;
+
+ if (filterString != null)
+ {
+ queueFilter = new FilterImpl(filterString);
+ }
+ else
+ {
+ queueFilter = null;
+ }
+
+ this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public Bindable getBindable()
+ {
+ return storeAndForwardQueue;
+ }
+
+ public SimpleString getRoutingName()
+ {
+ return routingName;
+ }
+
+ public SimpleString getUniqueName()
+ {
+ return uniqueName;
+ }
+
+ public boolean isExclusive()
+ {
+ return false;
+ }
+
+ public boolean isQueueBinding()
+ {
+ return false;
+ }
+
+ public boolean filterMatches(final ServerMessage message) throws Exception
+ {
+ if (queueFilter != null && !queueFilter.match(message))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public boolean isHighAcceptPriority(final ServerMessage message)
+ {
+ if (forwardWhenNoMatchingConsumers)
+ {
+ return true;
+ }
+
+ if (consumerCount == 0)
+ {
+ return false;
+ }
+
+ if (filters.isEmpty())
+ {
+ return true;
+ }
+ else
+ {
+ for (Filter filter : filters)
+ {
+ if (filter.match(message))
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public synchronized void addConsumer(final SimpleString filterString) throws Exception
+ {
+ if (filterString != null)
+ {
+ // There can actually be many consumers on the same queue with the same filter, so we need to maintain a ref
+ // count
+
+ Integer i = filterCounts.get(filterString);
+
+ if (i == null)
+ {
+ filterCounts.put(filterString, 0);
+
+ filters.add(new FilterImpl(filterString));
+ }
+ else
+ {
+ filterCounts.put(filterString, i + 1);
+ }
+ }
+
+ consumerCount++;
+ }
+
+ public synchronized void removeConsumer(final SimpleString filterString) throws Exception
+ {
+ if (filterString != null)
+ {
+ Integer i = filterCounts.get(filterString);
+
+ if (i != null)
+ {
+ int ii = i - 1;
+
+ if (ii == 0)
+ {
+ filterCounts.remove(filterString);
+
+ filters.remove(filterString);
+ }
+ else
+ {
+ filterCounts.put(filterString, ii);
+ }
+ }
+ }
+
+ consumerCount--;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DivertImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -96,35 +96,26 @@
this.storageManager = storageManager;
}
- public boolean accept(final ServerMessage message) throws Exception
+ public void preroute(final ServerMessage message, final Transaction tx) throws Exception
{
- if (filter != null && !filter.match(message))
+ //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
+ //can occur if the message is routed to a queue, then acked before it's routed here
+
+ //TODO - combine with similar code in QueueImpl.accept()
+
+ int count = message.incrementRefCount();
+
+ if (count == 1)
{
- return false;
- }
- else
- {
- //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
- //can occur if the message is routed to a queue, then acked before it's routed here
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
- //TODO - combine with similar code in QueueImpl.accept()
-
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- if (message.isDurable())
- {
- message.incrementDurableRefCount();
- }
-
- return true;
+ store.addSize(message.getMemoryEstimate());
}
+
+ if (message.isDurable())
+ {
+ message.incrementDurableRefCount();
+ }
}
public void route(ServerMessage message, final Transaction tx) throws Exception
@@ -180,4 +171,9 @@
{
return exclusive;
}
+
+ public Filter getFilter()
+ {
+ return filter;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -45,7 +45,8 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.DivertBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
@@ -288,7 +289,7 @@
true,
false);
- Binding binding = new BindingImpl(queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), queueBindingInfo.getQueueName(), queue, false, true);
+ Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue);
queues.put(queueBindingInfo.getPersistenceID(), queue);
@@ -799,7 +800,7 @@
Queue queue = queueFactory.createQueue(-1, name, filter, config.isDurable(), false);
- Binding queueBinding = new BindingImpl(new SimpleString(config.getAddress()), name, name, queue, false, true);
+ Binding queueBinding = new LocalQueueBinding(new SimpleString(config.getAddress()), queue);
binding = queueBinding;
@@ -868,7 +869,7 @@
pagingManager,
storageManager);
- Binding binding = new BindingImpl(sAddress, sName, new SimpleString(config.getRoutingName()), divert, config.isExclusive(), false);
+ Binding binding = new DivertBinding(sAddress, divert);
postOffice.addBinding(binding);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -116,7 +116,7 @@
private int consumersToFailover = -1;
- private final SimpleString routeToPropertyName;
+ // private final SimpleString routeToPropertyName;
public QueueImpl(final long persistenceID,
final SimpleString name,
@@ -144,8 +144,6 @@
this.queueSettingsRepository = queueSettingsRepository;
- routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
-
if (postOffice == null)
{
pagingManager = null;
@@ -161,43 +159,7 @@
}
// Bindable implementation -------------------------------------------------------------------------------------
-
- public boolean accept(final ServerMessage message) throws Exception
- {
- // if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
- // {
- // if (message.removeProperty(routeToPropertyName) == null)
- // {
- // return false;
- // }
- // }
-
- if (filter != null && !filter.match(message))
- {
- return false;
- }
- else
- {
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
- {
- message.incrementDurableRefCount();
- }
-
- return true;
- }
- }
-
+
public SimpleString getRoutingName()
{
return name;
@@ -213,14 +175,29 @@
return false;
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public void preroute(final ServerMessage message, final Transaction tx) throws Exception
{
- // Temp
- SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
- message.removeProperty(routeToHeader);
+ int count = message.incrementRefCount();
+ if (count == 1)
+ {
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ store.addSize(message.getMemoryEstimate());
+ }
+
boolean durableRef = message.isDurable() && durable;
+ if (durableRef)
+ {
+ message.incrementDurableRefCount();
+ }
+ }
+
+ public void route(final ServerMessage message, final Transaction tx) throws Exception
+ {
+ boolean durableRef = message.isDurable() && durable;
+
// If durable, must be persisted before anything is routed
MessageReference ref = message.createReference(this);
@@ -285,10 +262,6 @@
public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
{
- // Temp
- SimpleString routeToHeader = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
- message.removeProperty(routeToHeader);
-
MessageReference ref = message.createReference(this);
int count = message.incrementRefCount();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -195,16 +195,9 @@
return doHandle(ref);
}
- public SimpleString getFilterString()
+ public Filter getFilter()
{
- if (filter != null)
- {
- return filter.getFilterString();
- }
- else
- {
- return null;
- }
+ return filter;
}
public void handleClose(final Packet packet)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -39,7 +39,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.BindingImpl;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.DelayedResult;
import org.jboss.messaging.core.remoting.FailureListener;
@@ -1492,7 +1492,7 @@
final Queue queue = queueFactory.createQueue(-1, name, filter, durable, temporary);
- binding = new BindingImpl(address, name, name, queue, false, true);
+ binding = new LocalQueueBinding(address, queue);
if (durable)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -34,12 +34,19 @@
*/
public class FakeBinding implements Binding
{
- public boolean accept(ServerMessage message) throws Exception
+
+ public boolean filterMatches(ServerMessage message) throws Exception
{
// TODO Auto-generated method stub
return false;
}
+ public boolean isHighAcceptPriority(ServerMessage message)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
public SimpleString getRoutingName()
{
// TODO Auto-generated method stub
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -589,7 +589,7 @@
class NullConsumer implements Consumer
{
- public SimpleString getFilterString()
+ public Filter getFilter()
{
return null;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-01-28 10:53:22 UTC (rev 5750)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeConsumer.java 2009-01-28 14:18:19 UTC (rev 5751)
@@ -59,7 +59,7 @@
this.filter = filter;
}
- public SimpleString getFilterString()
+ public Filter getFilter()
{
// TODO Auto-generated method stub
return null;
More information about the jboss-cvs-commits
mailing list