[jboss-cvs] JBoss Messaging SVN: r5710 - in trunk/src/main/org/jboss/messaging/core: management and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 23 13:18:28 EST 2009


Author: timfox
Date: 2009-01-23 13:18:27 -0500 (Fri, 23 Jan 2009)
New Revision: 5710

Added:
   trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
Modified:
   trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
more clustering jiggery-pokery

Modified: trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/config/cluster/ClusterConfiguration.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -40,33 +40,50 @@
 {
    private static final long serialVersionUID = 8948303813427795935L;
 
+   private final String name;
+   
    private final String address;
 
    private final BridgeConfiguration bridgeConfig;
+   
+   private final boolean duplicateDetection;
 
    private final List<Pair<String, String>> staticConnectorNamePairs;
 
    private final String discoveryGroupName;
 
-   public ClusterConfiguration(final String address,
+   public ClusterConfiguration(final String name,
+                               final String address,
                                final BridgeConfiguration bridgeConfig,
+                               final boolean duplicateDetection,
                                final List<Pair<String, String>> staticConnectorNamePairs)
    {
+      this.name = name;
       this.address = address;
       this.bridgeConfig = bridgeConfig;
       this.staticConnectorNamePairs = staticConnectorNamePairs;
+      this.duplicateDetection = duplicateDetection;
       this.discoveryGroupName = null;
    }
 
-   public ClusterConfiguration(final String address,
+   public ClusterConfiguration(final String name,
+                               final String address,
                                final BridgeConfiguration bridgeConfig,
+                               final boolean duplicateDetection,
                                final String discoveryGroupName)
    {
+      this.name = name;
       this.address = address;
       this.bridgeConfig = bridgeConfig;
+      this.duplicateDetection = duplicateDetection;
       this.discoveryGroupName = discoveryGroupName;
       this.staticConnectorNamePairs = null;
    }
+   
+   public String getName()
+   {
+      return name;
+   }
 
    public String getAddress()
    {
@@ -77,6 +94,11 @@
    {
       return bridgeConfig;
    }
+   
+   public boolean isDuplicateDetection()
+   {
+      return duplicateDetection;
+   }
 
    public List<Pair<String, String>> getStaticConnectorNamePairs()
    {

Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.messagecounter.MessageCounterManager;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -46,11 +47,11 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TypedProperties;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -101,6 +102,10 @@
    void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception;
 
    void unregisterBridge(String name) throws Exception;
+   
+   void registerCluster(Cluster cluster, ClusterConfiguration configuration) throws Exception;
+   
+   void unregisterCluster(String name) throws Exception;
 
    void registerResource(ObjectName objectName, Object resource) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -47,6 +47,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.AcceptorControlMBean;
@@ -76,6 +77,7 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -428,6 +430,16 @@
       unregisterFromRegistry(objectName);
       unregisterFromJMX(objectName);
    }
+   
+   public void registerCluster(final Cluster cluster, final ClusterConfiguration configuration) throws Exception
+   {      
+      //TODO
+   }
+   
+   public void unregisterCluster(final String name) throws Exception
+   {      
+      //TODO
+   }
 
    public Object getResource(final ObjectName objectName)
    {

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -67,11 +67,11 @@
    
    public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("JBM_DUPL_ID");
    
-   public static final SimpleString HDR_MAX_HOPS = new SimpleString("JBM_MAX_HOPS");
-   
    public static final SimpleString HDR_ROUTE_TO_PREFIX = new SimpleString("JBM_ROUTE_TO:");
-   
+      
    public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("JBM_RESET_QUEUE_DATA");
+      
+   public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("JBM_FROM_CLUSTER");
    
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -192,11 +192,7 @@
                if (binding.accept(message))
                {
                   chosen.add(binding.getBindable());
-                  
-                  SimpleString headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(binding.getRoutingName());
-                  
-                  message.putBooleanProperty(headerName, Boolean.valueOf(true)); 
-                  
+                   
                   break;
                }
             }

Added: trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/Cluster.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.cluster;
+
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A Cluster
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 23 Jan 2009 14:51:55
+ *
+ *
+ */
+public interface Cluster extends MessagingComponent
+{
+   SimpleString getName();
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -140,6 +140,8 @@
    private final int maxRetriesAfterFailover;
 
    private final MessageHandler queueInfoMessageHandler;
+   
+   private final String queueDataAddress;
 
    // Static --------------------------------------------------------
 
@@ -163,7 +165,8 @@
                      final int maxRetriesBeforeFailover,
                      final int maxRetriesAfterFailover,
                      final boolean useDuplicateDetection,
-                     final MessageHandler queueInfoMessageHandler) throws Exception
+                     final MessageHandler queueInfoMessageHandler,
+                     final String queueDataAddress) throws Exception
    {
       this.name = name;
 
@@ -205,6 +208,8 @@
       this.maxRetriesAfterFailover = maxRetriesAfterFailover;
 
       this.queueInfoMessageHandler = queueInfoMessageHandler;
+      
+      this.queueDataAddress = queueDataAddress;
 
       if (maxBatchTime != -1)
       {
@@ -260,7 +265,8 @@
 
                SimpleString notifQueueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
 
-               SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+               SimpleString filter = new SimpleString(ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "' AND " +                                                                                                           
+                                                      ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
                                                       "'" +
                                                       NotificationType.QUEUE_CREATED +
                                                       "'" +

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -48,9 +48,8 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.core.server.cluster.Cluster;
 import org.jboss.messaging.core.server.cluster.FlowBinding;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.ExecutorFactory;
 import org.jboss.messaging.util.Pair;
 import org.jboss.messaging.util.SimpleString;
@@ -66,7 +65,7 @@
  *
  *
  */
-public class ClusterImpl implements DiscoveryListener
+public class ClusterImpl implements Cluster, DiscoveryListener
 {
    private static final Logger log = Logger.getLogger(ClusterImpl.class);
 
@@ -76,8 +75,6 @@
 
    private final PostOffice postOffice;
 
-   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
    private final SimpleString name;
 
    private final SimpleString address;
@@ -86,8 +83,6 @@
 
    private final boolean useDuplicateDetection;
 
-   private final int maxHops;
-   
    private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
 
    private final DiscoveryGroup discoveryGroup;
@@ -104,12 +99,10 @@
    public ClusterImpl(final SimpleString name,
                       final SimpleString address,
                       final BridgeConfiguration bridgeConfig,
-                      final boolean useDuplicateDetection,
-                      final int maxHops,
+                      final boolean useDuplicateDetection,                
                       final ExecutorFactory executorFactory,
                       final StorageManager storageManager,
-                      final PostOffice postOffice,
-                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                      final PostOffice postOffice,                     
                       final ScheduledExecutorService scheduledExecutor,
                       final QueueFactory queueFactory,
                       final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
@@ -122,16 +115,12 @@
 
       this.useDuplicateDetection = useDuplicateDetection;
 
-      this.maxHops = maxHops;
-
       this.executorFactory = executorFactory;
 
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
 
-      this.queueSettingsRepository = queueSettingsRepository;
-
       this.discoveryGroup = null;
 
       this.scheduledExecutor = scheduledExecutor;
@@ -147,12 +136,10 @@
    public ClusterImpl(final SimpleString name,
                       final SimpleString address,
                       final BridgeConfiguration bridgeConfig,
-                      final boolean useDuplicateDetection,
-                      final int maxHops,
+                      final boolean useDuplicateDetection,              
                       final ExecutorFactory executorFactory,
                       final StorageManager storageManager,
-                      final PostOffice postOffice,
-                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                      final PostOffice postOffice,             
                       final ScheduledExecutorService scheduledExecutor,
                       final QueueFactory queueFactory,
                       final DiscoveryGroup discoveryGroup) throws Exception
@@ -169,8 +156,6 @@
 
       this.postOffice = postOffice;
 
-      this.queueSettingsRepository = queueSettingsRepository;
-
       this.scheduledExecutor = scheduledExecutor;
 
       this.queueFactory = queueFactory;
@@ -178,8 +163,6 @@
       this.discoveryGroup = discoveryGroup;
 
       this.useDuplicateDetection = useDuplicateDetection;
-
-      this.maxHops = maxHops;
    }
 
    public synchronized void start() throws Exception
@@ -318,7 +301,8 @@
                                            bridgeConfig.getMaxRetriesBeforeFailover(),
                                            bridgeConfig.getMaxRetriesAfterFailover(),
                                            false,
-                                           record);
+                                           record,
+                                           address.toString());
             
             record.setBridge(bridge);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -25,7 +25,9 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -36,6 +38,7 @@
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
 import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
@@ -43,8 +46,10 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.Cluster;
 import org.jboss.messaging.core.server.cluster.ClusterManager;
 import org.jboss.messaging.core.server.cluster.Transformer;
 import org.jboss.messaging.util.ExecutorFactory;
@@ -69,6 +74,8 @@
    private final Map<String, DiscoveryGroup> discoveryGroups = new HashMap<String, DiscoveryGroup>();
 
    private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
+   
+   private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
 
    private final ExecutorFactory executorFactory;
 
@@ -81,6 +88,8 @@
    private final ManagementService managementService;
 
    private final Configuration configuration;
+   
+   private final QueueFactory queueFactory;
 
    private volatile boolean started;
 
@@ -89,7 +98,8 @@
                              final PostOffice postOffice,
                              final ScheduledExecutorService scheduledExecutor,
                              final ManagementService managementService,
-                             final Configuration configuration)
+                             final Configuration configuration,
+                             final QueueFactory queueFactory)
    {
       this.executorFactory = executorFactory;
 
@@ -102,6 +112,8 @@
       this.managementService = managementService;
 
       this.configuration = configuration;
+      
+      this.queueFactory = queueFactory;
    }
 
    public synchronized void start() throws Exception
@@ -125,6 +137,11 @@
       {
          deployBridge(config);
       }
+      
+      for (ClusterConfiguration config: configuration.getClusterConfigurations())
+      {
+         deployCluster(config);
+      }
 
       started = true;
    }
@@ -153,6 +170,12 @@
          bridge.stop();
          managementService.unregisterBridge(bridge.getName().toString());
       }
+      
+      for (Cluster cluster: clusters.values())
+      {
+         cluster.stop();
+         managementService.unregisterCluster(cluster.getName().toString());
+      }
 
       broadcastGroups.clear();
 
@@ -374,9 +397,9 @@
                                  config.getMaxRetriesBeforeFailover(),
                                  config.getMaxRetriesAfterFailover(),
                                  config.isUseDuplicateDetection(),
+                                 null,
                                  null);
 
-         log.info("put bridge " + this);
          bridges.put(config.getName(), bridge);
 
          managementService.registerBridge(bridge, config);
@@ -384,7 +407,98 @@
          bridge.start();
       }
    }
+   
+   private synchronized void deployCluster(final ClusterConfiguration config) throws Exception
+   {
+      if (config.getName() == null)
+      {
+         log.warn("Must specify a unique name for each cluster. This one will not be deployed.");
 
+         return;
+      }
+
+      if (config.getAddress() == null)
+      {
+         log.warn("Must specify an address for each cluster. This one will not be deployed.");
+
+         return;
+      }
+      
+      Cluster cluster;
+     
+      List<Pair<TransportConfiguration, TransportConfiguration>> connectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+      
+      if (config.getStaticConnectorNamePairs() != null)
+      {
+         for (Pair<String, String> connectorNamePair: config.getStaticConnectorNamePairs())
+         {                    
+            TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
+   
+            if (connector == null)
+            {
+               log.warn("No connector defined with name '" + connectorNamePair.a + "'. The bridge will not be deployed.");
+   
+               return;
+            }
+   
+            TransportConfiguration backupConnector = null;
+   
+            if (connectorNamePair.b != null)
+            {
+               backupConnector = configuration.getConnectorConfigurations().get(connectorNamePair.b);
+   
+               if (backupConnector == null)
+               {
+                  log.warn("No connector defined with name '" + connectorNamePair.b +
+                           "'. The bridge will not be deployed.");
+   
+                  return;
+               }
+            }
+   
+            Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
+                                                                                                                                 backupConnector);
+            
+            connectors.add(pair);
+         }
+         
+         cluster = new ClusterImpl(new SimpleString(config.getName()),
+                                           new SimpleString(config.getAddress()),
+                                           config.getBridgeConfig(),
+                                           config.isDuplicateDetection(),
+                                           executorFactory,
+                                           storageManager,
+                                           postOffice,
+                                           scheduledExecutor,
+                                           queueFactory,
+                                           connectors);
+      }
+      else
+      {
+         DiscoveryGroup dg = discoveryGroups.get(config.getDiscoveryGroupName());
+         
+         if (dg == null)
+         {
+            log.warn("No discovery group with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+         }
+         
+         cluster = new ClusterImpl(new SimpleString(config.getName()),
+                                           new SimpleString(config.getAddress()),
+                                           config.getBridgeConfig(),
+                                           config.isDuplicateDetection(),
+                                           executorFactory,
+                                           storageManager,
+                                           postOffice,
+                                           scheduledExecutor,
+                                           queueFactory,
+                                           dg);
+      }
+
+      managementService.registerCluster(cluster, config);
+
+      clusters.put(config.getName(), cluster);
+   }
+
    private Transformer instantiateTransformer(final String transformerClassName)
    {
       Transformer transformer = null;

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/FlowBindingImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_FROM_CLUSTER;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +57,8 @@
    private int consumerCount;
    
    private final boolean duplicateDetection;
+   
+   private final SimpleString headerName;
 
    public FlowBindingImpl(final SimpleString address,
                           final SimpleString uniqueName,
@@ -65,10 +69,17 @@
       super(address, uniqueName, routingName, bindable, false, false);
       
       this.duplicateDetection = duplicateDetection;
+      
+      headerName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(routingName);
    }
 
    public boolean accept(final ServerMessage message) throws Exception
    {
+      if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+      {
+         return false;
+      }
+      
       if (consumerCount == 0)
       {
          return false;
@@ -111,6 +122,11 @@
          }
       }
       
+            
+      message.putBooleanProperty(headerName, Boolean.valueOf(true)); 
+      
+      message.putBooleanProperty(HDR_FROM_CLUSTER, Boolean.valueOf(true));
+           
       return accepted;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -370,7 +370,8 @@
                                                  postOffice,                                        
                                                  scheduledExecutor,
                                                  managementService,
-                                                 configuration);
+                                                 configuration,
+                                                 queueFactory);
 
          clusterManager.start();
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-23 16:43:39 UTC (rev 5709)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-23 18:18:27 UTC (rev 5710)
@@ -114,6 +114,8 @@
 
    private int consumersToFailover = -1;
 
+   private final SimpleString routeToPropertyName;  
+      
    public QueueImpl(final long persistenceID,
                     final SimpleString name,
                     final Filter filter,
@@ -139,6 +141,8 @@
       this.storageManager = storageManager;
 
       this.queueSettingsRepository = queueSettingsRepository;
+      
+      this.routeToPropertyName = MessageImpl.HDR_ROUTE_TO_PREFIX.concat(name);
 
       if (postOffice == null)
       {
@@ -158,6 +162,14 @@
 
    public boolean accept(final ServerMessage message) throws Exception
    {
+//      if (message.containsProperty(MessageImpl.HDR_FROM_CLUSTER))
+//      {
+//         if (message.removeProperty(routeToPropertyName) == null)
+//         {
+//            return false;
+//         }                  
+//      }
+            
       if (filter != null && !filter.match(message))
       {
          return false;




More information about the jboss-cvs-commits mailing list