[jboss-cvs] JBoss Messaging SVN: r5710 - in trunk/src/main/org/jboss/messaging/core: management and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 23 13:18:28 EST 2009
Author: timfox
Date: 2009-01-23 13:18:27 -0500 (Fri, 23 Jan 2009)
New Revision: 5710
Added:
trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
Modified:
trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
more clustering jiggery-pokery
Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -40,33 +40,50 @@
{
private static final long serialVersionUID = 8948303813427795935L;
+ private final String name;
+
private final String address;
private final BridgeConfiguration bridgeConfig;
+
+ private final boolean duplicateDetection;
private final List<Pair<String, String>> staticConnectorNamePairs;
private final String discoveryGroupName;
- public ClusterConfiguration(final String address,
+ public ClusterConfiguration(final String name,
+ final String address,
final BridgeConfiguration bridgeConfig,
+ final boolean duplicateDetection,
final List<Pair<String, String>> staticConnectorNamePairs)
{
+ this.name = name;
this.address = address;
this.bridgeConfig = bridgeConfig;
this.staticConnectorNamePairs = staticConnectorNamePairs;
+ this.duplicateDetection = duplicateDetection;
this.discoveryGroupName = null;
}
- public ClusterConfiguration(final String address,
+ public ClusterConfiguration(final String name,
+ final String address,
final BridgeConfiguration bridgeConfig,
+ final boolean duplicateDetection,
final String discoveryGroupName)
{
+ this.name = name;
this.address = address;
this.bridgeConfig = bridgeConfig;
+ this.duplicateDetection = duplicateDetection;
this.discoveryGroupName = discoveryGroupName;
this.staticConnectorNamePairs = null;
}
+
+ public String getName()
+ {
+ return name;
+ }
public String getAddress()
{
@@ -77,6 +94,11 @@
{
return bridgeConfig;
}
+
+ public boolean isDuplicateDetection()
+ {
+ return duplicateDetection;
+ }
public List<Pair<String, String>> getStaticConnectorNamePairs()
{
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -46,11 +47,11 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -101,6 +102,10 @@
void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception;
void unregisterBridge(String name) throws Exception;
+
+ void registerCluster(Cluster cluster, ClusterConfiguration configuration) throws Exception;
+
+ void unregisterCluster(String name) throws Exception;
void registerResource(ObjectName objectName, Object resource) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -47,6 +47,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.AcceptorControlMBean;
@@ -76,6 +77,7 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -428,6 +430,16 @@
unregisterFromRegistry(objectName);
unregisterFromJMX(objectName);
}
+
+ public void registerCluster(final Cluster cluster, final ClusterConfiguration configuration) throws Exception
+ {
+ //TODO
+ }
+
+ public void unregisterCluster(final String name) throws Exception
+ {
+ //TODO
+ }
public Object getResource(final ObjectName objectName)
{
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -67,11 +67,11 @@
public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
- public static final SimpleString HDR_MAX_HOPS = new SimpleString("JBM_MAX_HOPS");
-
public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
-
+
public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
+
+ public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("JBM_FROM_CLUSTER");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -192,11 +192,7 @@
if (binding.accept(message))
{
chosen.add(binding.getBindable());
-
- SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getRoutingName());
-
- message.putBooleanProperty(headerName, Boolean.valueOf(true));
-
+
break;
}
}
Added: trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A Cluster
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 23 Jan 2009 14:51:55
+ *
+ *
+ */
+public interface Cluster extends MessagingComponent
+{
+ SimpleString getName();
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -140,6 +140,8 @@
private final int maxRetriesAfterFailover;
private final MessageHandler queueInfoMessageHandler;
+
+ private final String queueDataAddress;
// Static --------------------------------------------------------
@@ -163,7 +165,8 @@
final int maxRetriesBeforeFailover,
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
- final MessageHandler queueInfoMessageHandler) throws Exception
+ final MessageHandler queueInfoMessageHandler,
+ final String queueDataAddress) throws Exception
{
this.name = name;
@@ -205,6 +208,8 @@
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.queueInfoMessageHandler = queueInfoMessageHandler;
+
+ this.queueDataAddress = queueDataAddress;
if (maxBatchTime != -1)
{
@@ -260,7 +265,8 @@
SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
- SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "' AND " +
+ ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
"'" +
NotificationType.QUEUE_CREATED +
"'" +
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -48,9 +48,8 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.Cluster;
import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.ExecutorFactory;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
@@ -66,7 +65,7 @@
*
*
*/
-public class ClusterImpl implements DiscoveryListener
+public class ClusterImpl implements Cluster, DiscoveryListener
{
private static final Logger log = Logger.getLogger(ClusterImpl.class);
@@ -76,8 +75,6 @@
private final PostOffice postOffice;
- private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
private final SimpleString name;
private final SimpleString address;
@@ -86,8 +83,6 @@
private final boolean useDuplicateDetection;
- private final int maxHops;
-
private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
private final DiscoveryGroup discoveryGroup;
@@ -104,12 +99,10 @@
public ClusterImpl(final SimpleString name,
final SimpleString address,
final BridgeConfiguration bridgeConfig,
- final boolean useDuplicateDetection,
- final int maxHops,
+ final boolean useDuplicateDetection,
final ExecutorFactory executorFactory,
final StorageManager storageManager,
- final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final PostOffice postOffice,
final ScheduledExecutorService scheduledExecutor,
final QueueFactory queueFactory,
final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
@@ -122,16 +115,12 @@
this.useDuplicateDetection = useDuplicateDetection;
- this.maxHops = maxHops;
-
this.executorFactory = executorFactory;
this.storageManager = storageManager;
this.postOffice = postOffice;
- this.queueSettingsRepository = queueSettingsRepository;
-
this.discoveryGroup = null;
this.scheduledExecutor = scheduledExecutor;
@@ -147,12 +136,10 @@
public ClusterImpl(final SimpleString name,
final SimpleString address,
final BridgeConfiguration bridgeConfig,
- final boolean useDuplicateDetection,
- final int maxHops,
+ final boolean useDuplicateDetection,
final ExecutorFactory executorFactory,
final StorageManager storageManager,
- final PostOffice postOffice,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final PostOffice postOffice,
final ScheduledExecutorService scheduledExecutor,
final QueueFactory queueFactory,
final DiscoveryGroup discoveryGroup) throws Exception
@@ -169,8 +156,6 @@
this.postOffice = postOffice;
- this.queueSettingsRepository = queueSettingsRepository;
-
this.scheduledExecutor = scheduledExecutor;
this.queueFactory = queueFactory;
@@ -178,8 +163,6 @@
this.discoveryGroup = discoveryGroup;
this.useDuplicateDetection = useDuplicateDetection;
-
- this.maxHops = maxHops;
}
public synchronized void start() throws Exception
@@ -318,7 +301,8 @@
bridgeConfig.getMaxRetriesBeforeFailover(),
bridgeConfig.getMaxRetriesAfterFailover(),
false,
- record);
+ record,
+ address.toString());
record.setBridge(bridge);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -25,7 +25,9 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -36,6 +38,7 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
@@ -43,8 +46,10 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
import org.jboss.messaging.core.server.cluster.ClusterManager;
import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.util.ExecutorFactory;
@@ -69,6 +74,8 @@
private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
+
+ private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
private final ExecutorFactory executorFactory;
@@ -81,6 +88,8 @@
private final ManagementService managementService;
private final Configuration configuration;
+
+ private final QueueFactory queueFactory;
private volatile boolean started;
@@ -89,7 +98,8 @@
final PostOffice postOffice,
final ScheduledExecutorService scheduledExecutor,
final ManagementService managementService,
- final Configuration configuration)
+ final Configuration configuration,
+ final QueueFactory queueFactory)
{
this.executorFactory = executorFactory;
@@ -102,6 +112,8 @@
this.managementService = managementService;
this.configuration = configuration;
+
+ this.queueFactory = queueFactory;
}
public synchronized void start() throws Exception
@@ -125,6 +137,11 @@
{
deployBridge(config);
}
+
+ for (ClusterConfiguration config: configuration.getClusterConfigurations())
+ {
+ deployCluster(config);
+ }
started = true;
}
@@ -153,6 +170,12 @@
bridge.stop();
managementService.unregisterBridge(bridge.getName().toString());
}
+
+ for (Cluster cluster: clusters.values())
+ {
+ cluster.stop();
+ managementService.unregisterCluster(cluster.getName().toString());
+ }
broadcastGroups.clear();
@@ -374,9 +397,9 @@
config.getMaxRetriesBeforeFailover(),
config.getMaxRetriesAfterFailover(),
config.isUseDuplicateDetection(),
+ null,
null);
- log.info("put bridge " + this);
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
@@ -384,7 +407,98 @@
bridge.start();
}
}
+
+ private synchronized void deployCluster(final ClusterConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ log.warn("Must specify a unique name for each cluster. This one will not be deployed.");
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ log.warn("Must specify an address for each cluster. This one will not be deployed.");
+
+ return;
+ }
+
+ Cluster cluster;
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+
+ if (config.getStaticConnectorNamePairs() != null)
+ {
+ for (Pair<String, String> connectorNamePair: config.getStaticConnectorNamePairs())
+ {
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
+
+ if (connector == null)
+ {
+ log.warn("No connector defined with name '" + connectorNamePair.a + "'. The bridge will not be deployed.");
+
+ return;
+ }
+
+ TransportConfiguration backupConnector = null;
+
+ if (connectorNamePair.b != null)
+ {
+ backupConnector = configuration.getConnectorConfigurations().get(connectorNamePair.b);
+
+ if (backupConnector == null)
+ {
+ log.warn("No connector defined with name '" + connectorNamePair.b +
+ "'. The bridge will not be deployed.");
+
+ return;
+ }
+ }
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
+ backupConnector);
+
+ connectors.add(pair);
+ }
+
+ cluster = new ClusterImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getBridgeConfig(),
+ config.isDuplicateDetection(),
+ executorFactory,
+ storageManager,
+ postOffice,
+ scheduledExecutor,
+ queueFactory,
+ connectors);
+ }
+ else
+ {
+ DiscoveryGroup dg = discoveryGroups.get(config.getDiscoveryGroupName());
+
+ if (dg == null)
+ {
+ log.warn("No discovery group with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+ }
+
+ cluster = new ClusterImpl(new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getBridgeConfig(),
+ config.isDuplicateDetection(),
+ executorFactory,
+ storageManager,
+ postOffice,
+ scheduledExecutor,
+ queueFactory,
+ dg);
+ }
+
+ managementService.registerCluster(cluster, config);
+
+ clusters.put(config.getName(), cluster);
+ }
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.server.cluster.impl;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_FROM_CLUSTER;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,6 +57,8 @@
private int consumerCount;
private final boolean duplicateDetection;
+
+ private final SimpleString headerName;
public FlowBindingImpl(final SimpleString address,
final SimpleString uniqueName,
@@ -65,10 +69,17 @@
super(address, uniqueName, routingName, bindable, false, false);
this.duplicateDetection = duplicateDetection;
+
+ headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(routingName);
}
public boolean accept(final ServerMessage message) throws Exception
{
+ if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+ {
+ return false;
+ }
+
if (consumerCount == 0)
{
return false;
@@ -111,6 +122,11 @@
}
}
+
+ message.putBooleanProperty(headerName, Boolean.valueOf(true));
+
+ message.putBooleanProperty(HDR_FROM_CLUSTER, Boolean.valueOf(true));
+
return accepted;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -370,7 +370,8 @@
postOffice,
scheduledExecutor,
managementService,
- configuration);
+ configuration,
+ queueFactory);
clusterManager.start();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
@@ -114,6 +114,8 @@
private int consumersToFailover = -1;
+ private final SimpleString routeToPropertyName;
+
public QueueImpl(final long persistenceID,
final SimpleString name,
final Filter filter,
@@ -139,6 +141,8 @@
this.storageManager = storageManager;
this.queueSettingsRepository = queueSettingsRepository;
+
+ this.routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
if (postOffice == null)
{
@@ -158,6 +162,14 @@
public boolean accept(final ServerMessage message) throws Exception
{
+// if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+// {
+// if (message.removeProperty(routeToPropertyName) == null)
+// {
+// return false;
+// }
+// }
+
if (filter != null && !filter.match(message))
{
return false;
More information about the jboss-cvs-commits
mailing list