[jboss-cvs] JBoss Messaging SVN: r7630 - in trunk: src/main/org/jboss/messaging/core/server/cluster/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 28 10:27:21 EDT 2009
Author: jmesnil
Date: 2009-07-28 10:27:21 -0400 (Tue, 28 Jul 2009)
New Revision: 7630
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
Log:
JBMESSAGING-1550: Core BridgeConfiguration with a discovery group is not deployed
* when discoveryGroupAddress is set, use it to create the bridge's client session factory.
Otherwise, use the connectorPair
* added utility method FilterImpl.createFilter(SimpleString)
Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-07-28 14:27:21 UTC (rev 7630)
@@ -106,6 +106,16 @@
Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
return filter;
}
+
+ /**
+ * @return null if <code>filterStr</code> is null or a valid filter else
+ * @throws MessagingException if the string does not correspond to a valid filter
+ */
+ public static Filter createFilter(final SimpleString filterStr) throws MessagingException
+ {
+ Filter filter = filterStr == null ? null : new FilterImpl(filterStr);
+ return filter;
+ }
// Constructors ---------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-07-28 14:27:21 UTC (rev 7630)
@@ -56,7 +56,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -92,8 +91,6 @@
private final Executor executor;
- private final SimpleString filterString;
-
private final Filter filter;
private final SimpleString forwardingAddress;
@@ -116,6 +113,10 @@
private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
+ private final String discoveryAddress;
+
+ private final int discoveryPort;
+
private final long retryInterval;
private final double retryIntervalMultiplier;
@@ -146,9 +147,16 @@
// Public --------------------------------------------------------
+ /**
+ * discoveryAddress (+ discoveryPort) and connectorPair are mutually exclusive.
+ * If discoveryAddress is != null, it will be used to create the bridge's client session factory.
+ * Otherwise, the connectorPair will be used
+ */
public BridgeImpl(final UUID nodeUUID,
final SimpleString name,
final Queue queue,
+ final String discoveryAddress,
+ final int discoveryPort,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final Executor executor,
final SimpleString filterString,
@@ -164,58 +172,11 @@
final SimpleString managementNotificationAddress,
final String clusterUser,
final String clusterPassword,
+ final MessageFlowRecord flowRecord,
final Channel replicatingChannel,
final boolean activated,
final StorageManager storageManager) throws Exception
{
- this(nodeUUID,
- name,
- queue,
- connectorPair,
- executor,
- filterString,
- forwardingAddress,
- scheduledExecutor,
- transformer,
- retryInterval,
- retryIntervalMultiplier,
- reconnectAttempts,
- failoverOnServerShutdown,
- useDuplicateDetection,
- managementAddress,
- managementNotificationAddress,
- clusterUser,
- clusterPassword,
- null,
- replicatingChannel,
- activated,
- storageManager, null);
- }
-
- public BridgeImpl(final UUID nodeUUID,
- final SimpleString name,
- final Queue queue,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final Executor executor,
- final SimpleString filterString,
- final SimpleString forwardingAddress,
- final ScheduledExecutorService scheduledExecutor,
- final Transformer transformer,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final int reconnectAttempts,
- final boolean failoverOnServerShutdown,
- final boolean useDuplicateDetection,
- final SimpleString managementAddress,
- final SimpleString managementNotificationAddress,
- final String clusterUser,
- final String clusterPassword,
- final MessageFlowRecord flowRecord,
- final Channel replicatingChannel,
- final boolean activated,
- final StorageManager storageManager,
- MessagingServer server) throws Exception
- {
this.nodeUUID = nodeUUID;
this.name = name;
@@ -224,23 +185,18 @@
this.executor = executor;
- this.filterString = filterString;
+ this.filter = FilterImpl.createFilter(filterString);
- if (this.filterString != null)
- {
- this.filter = new FilterImpl(filterString);
- }
- else
- {
- this.filter = null;
- }
-
this.forwardingAddress = forwardingAddress;
this.transformer = transformer;
this.useDuplicateDetection = useDuplicateDetection;
+ this.discoveryAddress = discoveryAddress;
+
+ this.discoveryPort = discoveryPort;
+
this.connectorPair = connectorPair;
this.retryInterval = retryInterval;
@@ -565,8 +521,16 @@
{
queue.addConsumer(BridgeImpl.this);
- csf = new ClientSessionFactoryImpl(connectorPair.a,
- connectorPair.b);
+ csf = null;
+ if (discoveryAddress != null)
+ {
+ csf = new ClientSessionFactoryImpl(discoveryAddress, discoveryPort);
+ }
+ else
+ {
+ csf = new ClientSessionFactoryImpl(connectorPair.a,
+ connectorPair.b);
+ }
csf.setFailoverOnServerShutdown(failoverOnServerShutdown);
csf.setRetryInterval(retryInterval);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-07-28 14:27:21 UTC (rev 7630)
@@ -441,6 +441,8 @@
Bridge bridge = new BridgeImpl(nodeUUID,
queueName,
queue,
+ null,
+ -1,
connectorPair,
executorFactory.getExecutor(),
null,
@@ -459,8 +461,7 @@
record,
replicatingChannel,
!backup,
- server.getStorageManager(),
- server);
+ server.getStorageManager());
record.setBridge(bridge);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-07-28 12:14:50 UTC (rev 7629)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-07-28 14:27:21 UTC (rev 7630)
@@ -397,10 +397,45 @@
Queue queue = (Queue)binding.getBindable();
- Bridge bridge;
+ Bridge bridge = null;
- if (connectorNamePair != null)
+ if (config.getDiscoveryGroupName() != null)
{
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+ if (discoveryGroupConfiguration == null)
+ {
+ log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+
+ return;
+ }
+
+ bridge = new BridgeImpl(nodeUUID,
+ new SimpleString(config.getName()),
+ queue,
+ discoveryGroupConfiguration.getGroupAddress(),
+ discoveryGroupConfiguration.getGroupPort(),
+ null,
+ executorFactory.getExecutor(),
+ SimpleString.toSimpleString(config.getFilterString()),
+ new SimpleString(config.getForwardingAddress()),
+ scheduledExecutor,
+ transformer,
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getReconnectAttempts(),
+ config.isFailoverOnServerShutdown(),
+ config.isUseDuplicateDetection(),
+ managementService.getManagementAddress(),
+ managementService.getManagementNotificationAddress(),
+ managementService.getClusterUser(),
+ managementService.getClusterPassword(),
+ null,
+ replicatingChannel,
+ !backup,
+ server.getStorageManager());
+ }
+ else
+ {
TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
if (connector == null)
@@ -430,9 +465,11 @@
bridge = new BridgeImpl(nodeUUID,
new SimpleString(config.getName()),
queue,
+ null,
+ -1,
pair,
executorFactory.getExecutor(),
- config.getFilterString() == null ? null : new SimpleString(config.getFilterString()),
+ SimpleString.toSimpleString(config.getFilterString()),
new SimpleString(config.getForwardingAddress()),
scheduledExecutor,
transformer,
@@ -445,16 +482,17 @@
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
managementService.getClusterPassword(),
+ null,
replicatingChannel,
!backup,
server.getStorageManager());
+ }
- bridges.put(config.getName(), bridge);
+ bridges.put(config.getName(), bridge);
- managementService.registerBridge(bridge, config);
+ managementService.registerBridge(bridge, config);
- bridge.start();
- }
+ bridge.start();
}
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-07-28 14:27:21 UTC (rev 7630)
@@ -0,0 +1,213 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.bridge;
+
+import static org.jboss.messaging.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.Pair;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A BridgeWithDiscoveryGroupStartTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class BridgeWithDiscoveryGroupStartTest extends ServiceTestBase
+{
+ public void testStartStop() throws Exception
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ MessagingServer server0 = createClusteredServerWithParams(0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(SERVER_ID_PROP_NAME, 1);
+ MessagingServer server1 = createClusteredServerWithParams(1, true, server1Params);
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(InVMConnectorFactory.class.getName(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ final String groupAddress = "230.1.2.3";
+ final int port = 6745;
+
+ List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
+ connectorPairs.add(new Pair<String, String>(server1tc.getName(), null));
+
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 250,
+ connectorPairs);
+
+ server0.getConfiguration().getBroadcastGroupConfigurations().add(bcConfig);
+
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 5000);
+
+ server0.getConfiguration().getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+
+ final String bridgeName = "bridge1";
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ 0,
+ true,
+ true,
+ "dg1");
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+
+ bridge.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ assertNull(consumer1.receive(500));
+
+ bridge.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ server0.stop();
+
+ server1.stop();
+ }
+}
More information about the jboss-cvs-commits
mailing list