[jboss-cvs] JBoss Messaging SVN: r7630 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 28 10:27:21 EDT 2009


Author: jmesnil
Date: 2009-07-28 10:27:21 -0400 (Tue, 28 Jul 2009)
New Revision: 7630

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
Log:
JBMESSAGING-1550: Core BridgeConfiguration with a discovery group is not deployed

* when discoveryGroupAddress is set, use it to create the bridge's client session factory.
  Otherwise, use the connectorPair
* added utility method FilterImpl.createFilter(SimpleString)


Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2009-07-28 14:27:21 UTC (rev 7630)
@@ -106,6 +106,16 @@
       Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
       return filter;
    }
+   
+   /**
+    * @return null if <code>filterStr</code> is null or a valid filter else
+    * @throws MessagingException if the string does not correspond to a valid filter
+    */
+   public static Filter createFilter(final SimpleString filterStr) throws MessagingException
+   {
+      Filter filter = filterStr == null ? null : new FilterImpl(filterStr);
+      return filter;
+   }
 
    // Constructors ---------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-07-28 14:27:21 UTC (rev 7630)
@@ -56,7 +56,6 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -92,8 +91,6 @@
 
    private final Executor executor;
 
-   private final SimpleString filterString;
-
    private final Filter filter;
 
    private final SimpleString forwardingAddress;
@@ -116,6 +113,10 @@
 
    private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
 
+   private final String discoveryAddress;
+
+   private final int discoveryPort;
+
    private final long retryInterval;
 
    private final double retryIntervalMultiplier;
@@ -146,9 +147,16 @@
 
    // Public --------------------------------------------------------
 
+   /**
+    * discoveryAddress (+ discoveryPort) and connectorPair are mutually exclusive.
+    * If discoveryAddress is != null, it will be used to create the bridge's client session factory.
+    * Otherwise, the connectorPair will be used
+    */
    public BridgeImpl(final UUID nodeUUID,
                      final SimpleString name,
                      final Queue queue,
+                     final String discoveryAddress,
+                     final int discoveryPort,
                      final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                      final Executor executor,
                      final SimpleString filterString,
@@ -164,58 +172,11 @@
                      final SimpleString managementNotificationAddress,
                      final String clusterUser,
                      final String clusterPassword,
+                     final MessageFlowRecord flowRecord,
                      final Channel replicatingChannel,
                      final boolean activated,
                      final StorageManager storageManager) throws Exception
    {
-      this(nodeUUID,
-           name,
-           queue,
-           connectorPair,
-           executor,
-           filterString,
-           forwardingAddress,
-           scheduledExecutor,
-           transformer,
-           retryInterval,
-           retryIntervalMultiplier,
-           reconnectAttempts,
-           failoverOnServerShutdown,
-           useDuplicateDetection,
-           managementAddress,
-           managementNotificationAddress,
-           clusterUser,
-           clusterPassword,
-           null,
-           replicatingChannel,
-           activated,
-           storageManager, null);
-   }
-
-   public BridgeImpl(final UUID nodeUUID,
-                     final SimpleString name,
-                     final Queue queue,
-                     final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                     final Executor executor,
-                     final SimpleString filterString,
-                     final SimpleString forwardingAddress,
-                     final ScheduledExecutorService scheduledExecutor,
-                     final Transformer transformer,
-                     final long retryInterval,
-                     final double retryIntervalMultiplier,
-                     final int reconnectAttempts,
-                     final boolean failoverOnServerShutdown,
-                     final boolean useDuplicateDetection,
-                     final SimpleString managementAddress,
-                     final SimpleString managementNotificationAddress,
-                     final String clusterUser,
-                     final String clusterPassword,
-                     final MessageFlowRecord flowRecord,
-                     final Channel replicatingChannel,
-                     final boolean activated,
-                     final StorageManager storageManager,
-                     MessagingServer server) throws Exception
-   {
       this.nodeUUID = nodeUUID;
 
       this.name = name;
@@ -224,23 +185,18 @@
 
       this.executor = executor;
 
-      this.filterString = filterString;
+      this.filter = FilterImpl.createFilter(filterString);
 
-      if (this.filterString != null)
-      {
-         this.filter = new FilterImpl(filterString);
-      }
-      else
-      {
-         this.filter = null;
-      }
-
       this.forwardingAddress = forwardingAddress;
 
       this.transformer = transformer;
 
       this.useDuplicateDetection = useDuplicateDetection;
 
+      this.discoveryAddress = discoveryAddress;
+      
+      this.discoveryPort = discoveryPort;
+      
       this.connectorPair = connectorPair;
 
       this.retryInterval = retryInterval;
@@ -565,8 +521,16 @@
       {
          queue.addConsumer(BridgeImpl.this);
   
-         csf = new ClientSessionFactoryImpl(connectorPair.a,
-                                            connectorPair.b);
+         csf = null;
+         if (discoveryAddress != null)
+         {
+            csf = new ClientSessionFactoryImpl(discoveryAddress, discoveryPort);
+         }
+         else
+         {
+            csf = new ClientSessionFactoryImpl(connectorPair.a,
+                                         connectorPair.b);
+         }
          
          csf.setFailoverOnServerShutdown(failoverOnServerShutdown);
          csf.setRetryInterval(retryInterval);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-07-28 14:27:21 UTC (rev 7630)
@@ -441,6 +441,8 @@
       Bridge bridge = new BridgeImpl(nodeUUID,
                                      queueName,
                                      queue,
+                                     null,
+                                     -1,
                                      connectorPair,
                                      executorFactory.getExecutor(),
                                      null,
@@ -459,8 +461,7 @@
                                      record,
                                      replicatingChannel,
                                      !backup,
-                                     server.getStorageManager(),
-                                     server);
+                                     server.getStorageManager());
 
       record.setBridge(bridge);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-07-28 14:27:21 UTC (rev 7630)
@@ -397,10 +397,45 @@
 
       Queue queue = (Queue)binding.getBindable();
 
-      Bridge bridge;
+      Bridge bridge = null;
 
-      if (connectorNamePair != null)
+      if (config.getDiscoveryGroupName() != null)
       {
+         DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+         if (discoveryGroupConfiguration == null)
+         {
+            log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+
+            return;
+         }
+         
+         bridge = new BridgeImpl(nodeUUID,
+                                 new SimpleString(config.getName()),
+                                 queue,
+                                 discoveryGroupConfiguration.getGroupAddress(),
+                                 discoveryGroupConfiguration.getGroupPort(),
+                                 null,
+                                 executorFactory.getExecutor(),
+                                 SimpleString.toSimpleString(config.getFilterString()),
+                                 new SimpleString(config.getForwardingAddress()),
+                                 scheduledExecutor,
+                                 transformer,
+                                 config.getRetryInterval(),
+                                 config.getRetryIntervalMultiplier(),
+                                 config.getReconnectAttempts(),
+                                 config.isFailoverOnServerShutdown(),
+                                 config.isUseDuplicateDetection(),
+                                 managementService.getManagementAddress(),
+                                 managementService.getManagementNotificationAddress(),
+                                 managementService.getClusterUser(),
+                                 managementService.getClusterPassword(),
+                                 null,
+                                 replicatingChannel,
+                                 !backup,
+                                 server.getStorageManager());
+      }
+      else
+      {
          TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
 
          if (connector == null)
@@ -430,9 +465,11 @@
          bridge = new BridgeImpl(nodeUUID,
                                  new SimpleString(config.getName()),
                                  queue,
+                                 null,
+                                 -1,
                                  pair,
                                  executorFactory.getExecutor(),
-                                 config.getFilterString() == null ? null : new SimpleString(config.getFilterString()),
+                                 SimpleString.toSimpleString(config.getFilterString()),
                                  new SimpleString(config.getForwardingAddress()),
                                  scheduledExecutor,
                                  transformer,
@@ -445,16 +482,17 @@
                                  managementService.getManagementNotificationAddress(),
                                  managementService.getClusterUser(),
                                  managementService.getClusterPassword(),
+                                 null,
                                  replicatingChannel,
                                  !backup,
                                  server.getStorageManager());
+      }
 
-         bridges.put(config.getName(), bridge);
+      bridges.put(config.getName(), bridge);
 
-         managementService.registerBridge(bridge, config);
+      managementService.registerBridge(bridge, config);
 
-         bridge.start();
-      }
+      bridge.start();
    }
 
    private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java	2009-07-28 14:27:21 UTC (rev 7630)
@@ -0,0 +1,213 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.bridge;
+
+import static org.jboss.messaging.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A BridgeWithDiscoveryGroupStartTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ */
+public class BridgeWithDiscoveryGroupStartTest extends ServiceTestBase
+{
+   public void testStartStop() throws Exception
+   {
+      Map<String, Object> server0Params = new HashMap<String, Object>();
+      MessagingServer server0 = createClusteredServerWithParams(0, true, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      server1Params.put(SERVER_ID_PROP_NAME, 1);
+      MessagingServer server1 = createClusteredServerWithParams(1, true, server1Params);
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration server0tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server0Params);
+      TransportConfiguration server1tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server1Params);
+      connectors.put(server1tc.getName(), server1tc);
+
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      final String groupAddress = "230.1.2.3";
+      final int port = 6745;
+
+      List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
+      connectorPairs.add(new Pair<String, String>(server1tc.getName(), null));
+
+      BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             250,
+                                                                             connectorPairs);
+
+      server0.getConfiguration().getBroadcastGroupConfigurations().add(bcConfig);
+
+      DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 5000);
+
+      server0.getConfiguration().getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+
+      final String bridgeName = "bridge1";
+
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+                                                                        queueName0,
+                                                                        forwardAddress,
+                                                                        null,
+                                                                        null,
+                                                                        1000,
+                                                                        1d,
+                                                                        0,
+                                                                        true,
+                                                                        true,
+                                                                        "dg1");
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+      List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+      List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      server1.start();
+      server0.start();
+
+      ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+      ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+      ClientSession session0 = sf0.createSession(false, true, true);
+
+      ClientSession session1 = sf1.createSession(false, true, true);
+
+      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+      session1.start();
+
+      final int numMessages = 10;
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);
+
+         message.putIntProperty(propKey, i);
+
+         producer0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);
+
+         assertNotNull(message);
+
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+         message.acknowledge();
+      }
+
+      assertNull(consumer1.receive(200));
+
+      Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+
+      bridge.stop();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createClientMessage(false);
+
+         message.putIntProperty(propKey, i);
+
+         producer0.send(message);
+      }
+
+      assertNull(consumer1.receive(500));
+
+      bridge.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+
+         assertNotNull(message);
+
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+         message.acknowledge();
+      }
+
+      assertNull(consumer1.receive(200));
+
+      session0.close();
+
+      session1.close();
+
+      sf0.close();
+
+      sf1.close();
+
+      server0.stop();
+
+      server1.stop();
+   }
+}




More information about the jboss-cvs-commits mailing list