[jboss-cvs] JBoss Messaging SVN: r5860 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 13 10:03:51 EST 2009
Author: timfox
Date: 2009-02-13 10:03:51 -0500 (Fri, 13 Feb 2009)
New Revision: 5860
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
Log:
more clustering tests and tweaks
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/config/jbm-configuration.xml 2009-02-13 15:03:51 UTC (rev 5860)
@@ -4,9 +4,7 @@
<configuration>
<clustered>false</clustered>
-
-
-
+
<!-- Maximum number of threads to use for scheduled deliveries -->
<scheduled-max-pool-size>30</scheduled-max-pool-size>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.UUIDGenerator;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -171,7 +172,8 @@
{
InetAddress groupAddress = InetAddress.getByName(discoveryGroupAddress);
- discoveryGroup = new DiscoveryGroupImpl(discoveryGroupAddress,
+ discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryGroupAddress,
groupAddress,
discoveryGroupPort,
discoveryRefreshTimeout);
@@ -232,7 +234,8 @@
{
InetAddress groupAddress = InetAddress.getByName(discoveryGroupAddress);
- discoveryGroup = new DiscoveryGroupImpl(discoveryGroupAddress,
+ discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryGroupAddress,
groupAddress,
discoveryGroupPort,
discoveryRefreshTimeout);
Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
/**
* A DiscoveryGroupImpl
@@ -51,44 +52,52 @@
public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
{
private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
-
+
private static final int SOCKET_TIMEOUT = 500;
-
+
private MulticastSocket socket;
private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
-
+
private final String name;
private final Thread thread;
-
+
private boolean received;
-
+
private final Object waitLock = new Object();
-
+
private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
-
+
private final long timeout;
-
+
private volatile boolean started;
-
- public DiscoveryGroupImpl(final String name, final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+
+ private final String nodeID;
+
+ public DiscoveryGroupImpl(final String nodeID,
+ final String name,
+ final InetAddress groupAddress,
+ final int groupPort,
+ final long timeout) throws Exception
{
+ this.nodeID = nodeID;
+
this.name = name;
socket = new MulticastSocket(groupPort);
socket.joinGroup(groupAddress);
-
+
socket.setSoTimeout(SOCKET_TIMEOUT);
-
+
this.timeout = timeout;
-
+
thread = new Thread(this);
-
+
thread.setDaemon(true);
}
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -96,11 +105,11 @@
return;
}
+ started = true;
+
thread.start();
-
- started = true;
}
-
+
public void stop()
{
synchronized (this)
@@ -109,85 +118,85 @@
{
return;
}
-
+
started = false;
}
-
+
try
{
thread.join();
}
catch (InterruptedException e)
- {
+ {
}
-
- socket.close();
+
+ socket.close();
}
-
+
public boolean isStarted()
{
return started;
}
-
+
public String getName()
{
return name;
}
-
+
public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
{
return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
}
-
+
public boolean waitForBroadcast(final long timeout)
- {
+ {
synchronized (waitLock)
- {
+ {
long start = System.currentTimeMillis();
-
+
long toWait = timeout;
-
+
while (!received && toWait > 0)
- {
+ {
try
{
- waitLock.wait(toWait);
+ waitLock.wait(toWait);
}
catch (InterruptedException e)
- {
+ {
}
-
+
long now = System.currentTimeMillis();
-
+
toWait -= now - start;
- start = now;
+ start = now;
}
-
+
boolean ret = received;
-
+
received = false;
-
+
return ret;
- }
+ }
}
public void run()
{
- //TODO - can we use a smaller buffer size?
- final byte[] data = new byte[65535];
-
- final DatagramPacket packet = new DatagramPacket(data, data.length);
-
try
- {
+ {
+ // TODO - can we use a smaller buffer size?
+ final byte[] data = new byte[65535];
+
while (true)
{
if (!started)
{
return;
}
-
+
+ final DatagramPacket packet = new DatagramPacket(data, data.length);
+
try
{
socket.receive(packet);
@@ -203,74 +212,79 @@
continue;
}
}
-
+
ByteArrayInputStream bis = new ByteArrayInputStream(data);
-
+
ObjectInputStream ois = new ObjectInputStream(bis);
-
+
+ String originatingNodeID = ois.readUTF();
+
+ if (nodeID.equals(originatingNodeID))
+ {
+ //Ignore traffic from own node
+ continue;
+ }
+
int size = ois.readInt();
-
+
boolean changed = false;
-
+
synchronized (this)
- {
+ {
for (int i = 0; i < size; i++)
{
TransportConfiguration connector = (TransportConfiguration)ois.readObject();
-
+
boolean existsBackup = ois.readBoolean();
-
+
TransportConfiguration backupConnector = null;
-
+
if (existsBackup)
{
backupConnector = (TransportConfiguration)ois.readObject();
}
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
-
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
+ backupConnector);
+
Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
-
+
if (oldVal == null)
{
changed = true;
}
}
-
+
long now = System.currentTimeMillis();
-
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
-
- //Weed out any expired connectors
-
+
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet()
+ .iterator();
+ // Weed out any expired connectors
+
while (iter.hasNext())
{
Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
-
+
if (entry.getValue() + timeout <= now)
{
iter.remove();
-
+
changed = true;
}
}
}
-
- packet.setLength(data.length);
-
+
if (changed)
{
callListeners();
}
-
+
synchronized (waitLock)
{
received = true;
-
+
waitLock.notify();
}
-
}
}
catch (Exception e)
@@ -278,20 +292,25 @@
log.error("Failed to receive datagram", e);
}
}
-
+
public synchronized void registerListener(final DiscoveryListener listener)
{
- this.listeners.add(listener);
+ listeners.add(listener);
+
+ if (!connectors.isEmpty())
+ {
+ listener.connectorsChanged();
+ }
}
-
+
public synchronized void unregisterListener(final DiscoveryListener listener)
{
- this.listeners.remove(listener);
+ listeners.remove(listener);
}
-
+
private void callListeners()
- {
- for (DiscoveryListener listener: listeners)
+ {
+ for (DiscoveryListener listener : listeners)
{
try
{
@@ -299,9 +318,46 @@
}
catch (Throwable t)
{
- //Catch it so exception doesn't prevent other listeners from running
+ // Catch it so exception doesn't prevent other listeners from running
log.error("Failed to call discovery listener", t);
}
}
}
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
+
+ private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+ if (config.getParams() != null)
+ {
+ if (!config.getParams().isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+ }
+
+ return new SimpleString(str.toString());
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/QueueDeployer.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.deployers.impl;
@@ -37,23 +37,22 @@
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
public class QueueDeployer extends XmlDeployer
-{
+{
private final Configuration serverConfiguration;
- public QueueDeployer(final DeploymentManager deploymentManager,
- final Configuration configuration)
+ public QueueDeployer(final DeploymentManager deploymentManager, final Configuration configuration)
{
super(deploymentManager);
this.serverConfiguration = configuration;
}
-
+
/**
* the names of the elements to deploy
* @return the names of the elements todeploy
*/
public String[] getElementTagName()
{
- return new String[]{"queue"};
+ return new String[] { "queue" };
}
@Override
@@ -62,9 +61,10 @@
if ("deployment".equals(rootNode.getNodeName()))
{
XMLUtil.validate(rootNode, "jbm-configuration.xsd");
- } else
+ }
+ else
{
- XMLUtil.validate(rootNode, "jbm-queues.xsd");
+ XMLUtil.validate(rootNode, "jbm-queues.xsd");
}
}
@@ -80,7 +80,7 @@
configurations.add(queueConfig);
serverConfiguration.setQueueConfigurations(configurations);
}
-
+
@Override
public void undeploy(Node node) throws Exception
{
@@ -93,19 +93,19 @@
*/
public String[] getConfigFileNames()
{
- return new String[] {"jbm-configuration.xml", "jbm-queues.xml"};
+ return new String[] { "jbm-configuration.xml", "jbm-queues.xml" };
}
private QueueConfiguration parseQueueConfiguration(final Node node)
{
String name = node.getAttributes().getNamedItem("name").getNodeValue();
-
+
String address = node.getAttributes().getNamedItem("address").getNodeValue();
String filterString = null;
Node filterNode = node.getAttributes().getNamedItem("filter");
- if (filterNode !=null)
+ if (filterNode != null)
{
String filterValue = filterNode.getNodeValue();
if (!"".equals(filterValue.trim()))
@@ -120,7 +120,7 @@
{
durable = Boolean.parseBoolean(durableNode.getNodeValue());
}
-
+
return new QueueConfiguration(address, name, filterString, durable);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -280,6 +280,16 @@
public void stop() throws Exception
{
+ if (started)
+ {
+ //We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is trying to connect to the target
+ //server which isn't up in an infinite loop
+ if (csf != null)
+ {
+ csf.close();
+ }
+ }
+
executor.execute(new StopRunnable());
this.waitForRunnablesToComplete();
@@ -298,10 +308,6 @@
return;
}
- // We close the session factory here - this will cause any connection retries to stop
-
- csf.close();
-
if (session != null)
{
session.close();
@@ -401,7 +407,7 @@
{
return false;
}
-
+
try
{
queue.addConsumer(BridgeImpl.this);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -29,12 +29,14 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
/**
* A BroadcastGroupImpl
@@ -48,6 +50,8 @@
{
private static final Logger log = Logger.getLogger(BroadcastGroupImpl.class);
+ private final String nodeID;
+
private final String name;
private final InetAddress localBindAddress;
@@ -59,21 +63,24 @@
private final int groupPort;
private DatagramSocket socket;
-
+
private final List<Pair<TransportConfiguration, TransportConfiguration>> connectorPairs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-
+
private boolean started;
-
+
private ScheduledFuture<?> future;
-
- public BroadcastGroupImpl(final String name,
+
+ public BroadcastGroupImpl(final String nodeID,
+ final String name,
final InetAddress localBindAddress,
final int localPort,
final InetAddress groupAddress,
final int groupPort) throws Exception
{
+ this.nodeID = nodeID;
+
this.name = name;
-
+
this.localBindAddress = localBindAddress;
this.localPort = localPort;
@@ -84,43 +91,43 @@
// FIXME - doesn't seem to work when specifying port and address
- // this.socket = new DatagramSocket(localPort, localBindAddress);
+ // this.socket = new DatagramSocket(localPort, localBindAddress);
}
-
+
public synchronized void start() throws Exception
{
if (started)
{
return;
}
-
+
socket = new DatagramSocket();
-
+
started = true;
}
-
+
public synchronized void stop()
{
if (!started)
{
return;
}
-
+
if (future != null)
{
future.cancel(false);
}
-
+
socket.close();
-
+
started = false;
}
-
+
public synchronized boolean isStarted()
{
return started;
}
-
+
public String getName()
{
return name;
@@ -130,7 +137,7 @@
{
connectorPairs.add(connectorPair);
}
-
+
public synchronized void removeConnectorPair(final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
{
connectorPairs.remove(connectorPair);
@@ -140,7 +147,7 @@
{
return connectorPairs.size();
}
-
+
public synchronized void broadcastConnectors() throws Exception
{
// TODO - for now we just use plain serialization to serialize the transport configs
@@ -149,16 +156,18 @@
ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeUTF(nodeID);
+
oos.writeInt(connectorPairs.size());
-
+
for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)
{
oos.writeObject(connectorPair.a);
-
+
if (connectorPair.b != null)
{
oos.writeBoolean(true);
-
+
oos.writeObject(connectorPair.b);
}
else
@@ -172,7 +181,7 @@
byte[] data = bos.toByteArray();
DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
-
+
socket.send(packet);
}
@@ -182,9 +191,9 @@
{
return;
}
-
+
try
- {
+ {
broadcastConnectors();
}
catch (Exception e)
@@ -192,10 +201,47 @@
log.error("Failed to broadcast connector configs");
}
}
-
+
public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
{
this.future = future;
}
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
+ private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+ if (config.getParams() != null)
+ {
+ if (!config.getParams().isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : config.getParams().entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+ }
+
+ return new SimpleString(str.toString());
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -183,7 +184,7 @@
final ManagementService managementService,
final ScheduledExecutorService scheduledExecutor,
final QueueFactory queueFactory,
- final DiscoveryGroup discoveryGroup,
+ final DiscoveryGroup discoveryGroup,
final int maxHops,
final SimpleString nodeID) throws Exception
{
@@ -231,8 +232,6 @@
if (discoveryGroup != null)
{
- updateConnectors(discoveryGroup.getConnectors());
-
discoveryGroup.registerListener(this);
}
@@ -245,7 +244,7 @@
{
return;
}
-
+
if (discoveryGroup != null)
{
discoveryGroup.unregisterListener(this);
@@ -271,7 +270,7 @@
// DiscoveryListener implementation ------------------------------------------------------------------
- public void connectorsChanged()
+ public synchronized void connectorsChanged()
{
try
{
@@ -284,7 +283,7 @@
log.error("Failed to update connectors", e);
}
}
-
+
private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
@@ -314,7 +313,7 @@
if (!records.containsKey(connectorPair))
{
SimpleString queueName = generateQueueName(name, connectorPair);
-
+
Binding queueBinding = postOffice.getBinding(queueName);
Queue queue;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -90,7 +90,7 @@
private final Configuration configuration;
private final QueueFactory queueFactory;
-
+
private final SimpleString nodeID;
private volatile boolean started;
@@ -117,7 +117,7 @@
this.configuration = configuration;
this.queueFactory = queueFactory;
-
+
this.nodeID = nodeID;
}
@@ -215,7 +215,8 @@
InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
- BroadcastGroupImpl group = new BroadcastGroupImpl(config.getName(),
+ BroadcastGroupImpl group = new BroadcastGroupImpl(nodeID.toString(),
+ config.getName(),
localBindAddress,
config.getLocalBindPort(),
groupAddress,
@@ -283,7 +284,8 @@
InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
- DiscoveryGroup group = new DiscoveryGroupImpl(config.getName(),
+ DiscoveryGroup group = new DiscoveryGroupImpl(nodeID.toString(),
+ config.getName(),
groupAddress,
config.getGroupPort(),
config.getRefreshTimeout());
@@ -375,9 +377,9 @@
bridge = new BridgeImpl(new SimpleString(config.getName()),
queue,
pair,
- executorFactory.getExecutor(),
+ executorFactory.getExecutor(),
config.getFilterString() == null ? null : new SimpleString(config.getFilterString()),
- new SimpleString(config.getForwardingAddress()),
+ new SimpleString(config.getForwardingAddress()),
scheduledExecutor,
transformer,
config.getRetryInterval(),
@@ -466,7 +468,7 @@
managementService,
scheduledExecutor,
queueFactory,
- connectors,
+ connectors,
config.getMaxHops(),
nodeID);
}
@@ -494,7 +496,7 @@
managementService,
scheduledExecutor,
queueFactory,
- dg,
+ dg,
config.getMaxHops(),
nodeID);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -39,8 +39,9 @@
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
import org.jboss.messaging.core.config.cluster.ClusterConnectionConfiguration;
+import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.Binding;
@@ -160,15 +161,15 @@
}
}
- // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+ //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
- //log.info("Waited " + (System.currentTimeMillis() - start));
+ // log.info("Waited " + (System.currentTimeMillis() - start));
return;
}
- Thread.sleep(10);
+ Thread.sleep(100);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
@@ -418,7 +419,7 @@
{
verifyReceiveRoundRobinInSomeOrder(true, numMessages, consumerIDs);
}
-
+
protected void verifyReceiveRoundRobinInSomeOrder(boolean ack, int numMessages, int... consumerIDs) throws Exception
{
Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
@@ -455,7 +456,7 @@
counts.add(count);
countMap.put(i, count);
-
+
if (ack)
{
message.acknowledge();
@@ -470,12 +471,12 @@
assertTrue(counts.contains(i));
}
}
-
+
protected void verifyReceiveRoundRobinInSomeOrderNoAck(int numMessages, int... consumerIDs) throws Exception
{
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
}
-
+
protected void verifyNotReceive(int... consumerIDs) throws Exception
{
for (int i = 0; i < consumerIDs.length; i++)
@@ -563,6 +564,81 @@
services[node] = service;
}
+ protected void setupServerWithDiscovery(int node, String groupAddress, int port, boolean fileStorage, boolean netty)
+ {
+ if (services[node] != null)
+ {
+ throw new IllegalArgumentException("Already a service at node " + node);
+ }
+
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setPagingDirectory(getPageDir(node));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node));
+ configuration.setClustered(true);
+
+ configuration.getAcceptorConfigurations().clear();
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(invmtc);
+
+ if (netty)
+ {
+ TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ configuration.getAcceptorConfigurations().add(nettytc);
+ }
+
+ TransportConfiguration invmtc_c = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ configuration.getConnectorConfigurations().put(invmtc_c.getName(), invmtc_c);
+
+ List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
+
+ if (netty)
+ {
+ TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
+
+ connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), null));
+ }
+ else
+ {
+ connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), null));
+ }
+
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 250,
+ connectorPairs);
+
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
+
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", groupAddress, port, 500);
+
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+
+ MessagingService service;
+
+ if (fileStorage)
+ {
+ service = Messaging.newMessagingService(configuration);
+ }
+ else
+ {
+ service = Messaging.newNullStorageMessagingService(configuration);
+ }
+ services[node] = service;
+ }
+
protected Map<String, Object> generateParams(int node, boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -605,9 +681,9 @@
throw new IllegalStateException("No service at node " + nodeFrom);
}
- Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
- .getConfiguration()
- .getConnectorConfigurations();
+ // Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+ // .getConfiguration()
+ // .getConnectorConfigurations();
Map<String, Object> params = generateParams(nodeTo, netty);
@@ -622,9 +698,9 @@
serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
}
- connectors.put(serverTotc.getName(), serverTotc);
+ serviceFrom.getServer().getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
- serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+ // serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
@@ -641,15 +717,162 @@
forwardWhenNoConsumers,
maxHops,
pairs);
- List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
- .getConfiguration()
- .getClusterConfigurations();
+ serviceFrom.getServer().getConfiguration().getClusterConfigurations().add(clusterConf);
- clusterConfs.add(clusterConf);
+ // clusterConfs.add(clusterConf);
- serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+ // serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
}
+ // protected void setupClusterConnection(String name,
+ // int nodeFrom,
+ // int nodeTo,
+ // String address,
+ // boolean forwardWhenNoConsumers,
+ // int maxHops,
+ // boolean netty)
+ // {
+ // MessagingService serviceFrom = services[nodeFrom];
+ //
+ // if (serviceFrom == null)
+ // {
+ // throw new IllegalStateException("No service at node " + nodeFrom);
+ // }
+ //
+ // Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+ // .getConfiguration()
+ // .getConnectorConfigurations();
+ //
+ // Map<String, Object> params = generateParams(nodeTo, netty);
+ //
+ // TransportConfiguration serverTotc;
+ //
+ // if (netty)
+ // {
+ // serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ // }
+ // else
+ // {
+ // serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ // }
+ //
+ // connectors.put(serverTotc.getName(), serverTotc);
+ //
+ // serviceFrom.getServer().getConfiguration().setConnectorConfigurations(connectors);
+ //
+ // Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+ //
+ // List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+ // pairs.add(connectorPair);
+ //
+ // ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ // address,
+ // 100,
+ // 1d,
+ // -1,
+ // -1,
+ // true,
+ // forwardWhenNoConsumers,
+ // maxHops,
+ // pairs);
+ // List<ClusterConnectionConfiguration> clusterConfs = serviceFrom.getServer()
+ // .getConfiguration()
+ // .getClusterConfigurations();
+ //
+ // clusterConfs.add(clusterConf);
+ //
+ // serviceFrom.getServer().getConfiguration().setClusterConfigurations(clusterConfs);
+ // }
+
+ protected void setupClusterConnection(String name,
+ String address,
+ boolean forwardWhenNoConsumers,
+ int maxHops,
+ boolean netty,
+ int nodeFrom,
+ int... nodesTo)
+ {
+ MessagingService serviceFrom = services[nodeFrom];
+
+ if (serviceFrom == null)
+ {
+ throw new IllegalStateException("No service at node " + nodeFrom);
+ }
+
+ Map<String, TransportConfiguration> connectors = serviceFrom.getServer()
+ .getConfiguration()
+ .getConnectorConfigurations();
+
+ List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
+
+ for (int i = 0; i < nodesTo.length; i++)
+ {
+ Map<String, Object> params = generateParams(nodesTo[i], netty);
+
+ TransportConfiguration serverTotc;
+
+ if (netty)
+ {
+ serverTotc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
+ }
+
+ connectors.put(serverTotc.getName(), serverTotc);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
+
+ pairs.add(connectorPair);
+ }
+
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ address,
+ 100,
+ 1d,
+ -1,
+ -1,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ pairs);
+
+ serviceFrom.getServer().getConfiguration().getClusterConfigurations().add(clusterConf);
+ }
+
+ protected void setupDiscoveryClusterConnection(String name,
+ int node,
+ String discoveryGroupName,
+ String address,
+ boolean forwardWhenNoConsumers,
+ int maxHops,
+ boolean netty)
+ {
+ MessagingService service = services[node];
+
+ if (service == null)
+ {
+ throw new IllegalStateException("No service at node " + node);
+ }
+
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
+ address,
+ 100,
+ 1d,
+ -1,
+ -1,
+ true,
+ forwardWhenNoConsumers,
+ maxHops,
+ discoveryGroupName);
+ List<ClusterConnectionConfiguration> clusterConfs = service.getServer()
+ .getConfiguration()
+ .getClusterConfigurations();
+
+ clusterConfs.add(clusterConf);
+ }
+
protected void startServers(int... nodes) throws Exception
{
for (int i = 0; i < nodes.length; i++)
@@ -661,10 +884,11 @@
protected void stopServers(int... nodes) throws Exception
{
for (int i = 0; i < nodes.length; i++)
- {
- log.info("*** stopping server " + i);
- services[nodes[i]].stop();
- log.info("*** stopped server " + i);
+ {
+ if (services[nodes[i]].isStarted())
+ {
+ services[nodes[i]].stop();
+ }
}
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -0,0 +1,47 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithDiscoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 13 Feb 2009 13:52:03
+ *
+ *
+ */
+public class NettySymmetricClusterWithDiscoveryTest extends SymmetricClusterWithDiscoveryTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -69,6 +69,21 @@
return false;
}
+ /*
+ * make sure source can shutdown if target is never started
+ */
+ public void testNeverStartTargetStartSourceThenStopSource() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
+ startServers(0);
+
+ //Give it a little time for the bridge to try to start
+ Thread.sleep(2000);
+
+ log.info("Stopping server 0");
+ stopServers(0);
+ }
+
public void testStartTargetServerBeforeSourceServer() throws Exception
{
setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -42,24 +42,16 @@
@Override
protected void setUp() throws Exception
{
- super.setUp();
-
- setupServer(0, isFileStorage(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
- setupServer(2, isFileStorage(), isNetty());
- setupServer(3, isFileStorage(), isNetty());
- setupServer(4, isFileStorage(), isNetty());
+ super.setUp();
+
+ setupServers();
}
-
+
@Override
protected void tearDown() throws Exception
{
- closeAllConsumers();
+ stopServers();
- closeAllSessionFactories();
-
- stopServers(0, 1, 2, 3, 4);
-
super.tearDown();
}
@@ -1457,37 +1449,68 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27);
}
- private void setupCluster() throws Exception
+ protected void setupCluster() throws Exception
{
setupCluster(false);
}
- private void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
- setupClusterConnection("cluster0-1", 0, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster0-2", 0, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster0-3", 0, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster0-4", 0, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
- setupClusterConnection("cluster1-0", 1, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster1-2", 1, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster1-3", 1, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster1-4", 1, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
- setupClusterConnection("cluster2-0", 2, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster2-1", 2, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster2-3", 2, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster2-4", 2, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
- setupClusterConnection("cluster3-0", 3, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster3-1", 3, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster3-2", 3, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster3-4", 3, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
- setupClusterConnection("cluster4-0", 4, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster4-1", 4, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster4-2", 4, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
- setupClusterConnection("cluster4-3", 4, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+ setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
}
+ protected void setupServers() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4);
+ }
+
+// private void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+// {
+// setupClusterConnection("cluster0-1", 0, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster0-2", 0, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster0-3", 0, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster0-4", 0, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+// setupClusterConnection("cluster1-0", 1, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster1-2", 1, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster1-3", 1, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster1-4", 1, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+// setupClusterConnection("cluster2-0", 2, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster2-1", 2, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster2-3", 2, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster2-4", 2, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+// setupClusterConnection("cluster3-0", 3, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster3-1", 3, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster3-2", 3, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster3-4", 3, 4, "queues", forwardWhenNoConsumers, 1, isNetty());
+//
+// setupClusterConnection("cluster4-0", 4, 0, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster4-1", 4, 1, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster4-2", 4, 2, "queues", forwardWhenNoConsumers, 1, isNetty());
+// setupClusterConnection("cluster4-3", 4, 3, "queues", forwardWhenNoConsumers, 1, isNetty());
+// }
+
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithDiscoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 3 Feb 2009 09:10:43
+ *
+ *
+ */
+public class SymmetricClusterWithDiscoveryTest extends SymmetricClusterTest
+{
+ private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
+
+ private static final String groupAddress = "230.1.2.3";
+
+ private static final int groupPort = 6745;
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ @Override
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster1", 1, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster2", 2, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster3", 3, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+
+ setupDiscoveryClusterConnection("cluster4", 4, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty());
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ setupServerWithDiscovery(0, groupAddress, groupPort, isFileStorage(), isNetty());
+ setupServerWithDiscovery(1, groupAddress, groupPort, isFileStorage(), isNetty());
+ setupServerWithDiscovery(2, groupAddress, groupPort, isFileStorage(), isNetty());
+ setupServerWithDiscovery(3, groupAddress, groupPort, isFileStorage(), isNetty());
+ setupServerWithDiscovery(4, groupAddress, groupPort, isFileStorage(), isNetty());
+ }
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-02-13 11:59:51 UTC (rev 5859)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-02-13 15:03:51 UTC (rev 5860)
@@ -69,7 +69,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -82,7 +82,7 @@
bg.addConnectorPair(connectorPair);
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
dg.start();
@@ -107,7 +107,50 @@
dg.stop();
}
+
+ public void testIgnoreTrafficFromOwnNode() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName(address1);
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ String nodeID = randomString();
+ BroadcastGroup bg = new BroadcastGroupImpl(nodeID, randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(live1,
+ backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(nodeID, randomString(), groupAddress, groupPort, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertFalse(ok);
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+ assertNotNull(connectors);
+
+ assertEquals(0, connectors.size());
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
// There is a bug in some OSes where different addresses but *Same port* will receive the traffic - hence this test won't pass
// See http://www.jboss.org/community/docs/DOC-11710 (jboss wiki promiscuous traffic)
@@ -118,7 +161,7 @@
// final int groupPort = 6745;
// final int timeout = 500;
//
-// BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+// BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
//
// bg.start();
//
@@ -133,7 +176,7 @@
//
// final InetAddress groupAddress2 = InetAddress.getByName(address2);
//
-// DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort, timeout);
+// DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, groupPort, timeout);
//
// dg.start();
//
@@ -155,7 +198,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -170,7 +213,7 @@
final int port2 = 6746;
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, port2, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, port2, timeout);
dg.start();
@@ -191,7 +234,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -207,7 +250,7 @@
final InetAddress groupAddress2 = InetAddress.getByName(address2);
final int port2 = 6746;
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, port2, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, port2, timeout);
dg.start();
@@ -235,13 +278,13 @@
final int timeout = 500;
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress1, groupPort1);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress1, groupPort1);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress2, groupPort2);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress2, groupPort2);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress3, groupPort3);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress3, groupPort3);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -266,13 +309,13 @@
bg2.addConnectorPair(connectorPair2);
bg3.addConnectorPair(connectorPair3);
- DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress1, groupPort1, timeout);
+ DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress1, groupPort1, timeout);
dg1.start();
- DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort2, timeout);
+ DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress2, groupPort2, timeout);
dg2.start();
- DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress3, groupPort3, timeout);
+ DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress3, groupPort3, timeout);
dg3.start();
bg1.broadcastConnectors();
@@ -320,7 +363,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -331,7 +374,7 @@
bg.addConnectorPair(connectorPair);
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
dg.start();
@@ -363,7 +406,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -374,7 +417,7 @@
bg.addConnectorPair(connectorPair);
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
MyListener listener1 = new MyListener();
MyListener listener2 = new MyListener();
@@ -475,13 +518,13 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg1.start();
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg2.start();
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg3.start();
TransportConfiguration live1 = generateTC();
@@ -502,7 +545,7 @@
backup3);
bg3.addConnectorPair(connectorPair3);
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
MyListener listener1 = new MyListener();
dg.registerListener(listener1);
@@ -715,7 +758,7 @@
final int groupPort = 6745;
final int timeout = 500;
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
bg.start();
@@ -727,11 +770,11 @@
bg.addConnectorPair(connectorPair1);
- DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
- DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
- DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+ DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
dg1.start();
dg2.start();
More information about the jboss-cvs-commits
mailing list