[jboss-cvs] JBoss Messaging SVN: r5673 - in trunk: src/main/org/jboss/messaging/core/config/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 21 08:29:09 EST 2009
Author: timfox
Date: 2009-01-21 08:29:08 -0500 (Wed, 21 Jan 2009)
New Revision: 5673
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MaxHopsTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java
Log:
more testing and clustering work
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -357,7 +357,7 @@
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
-
+
this.initialWaitTimeout = -1;
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(connectorConfig,
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -791,7 +791,7 @@
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
channel.lock();
-
+
try
{
channel.transferConnection(backupConnection);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -398,6 +398,7 @@
{
if (me.getCode() == MessagingException.OBJECT_CLOSED)
{
+ log.info("The server closed the connection");
// The server has closed the connection. We don't want failover to occur in this case -
// either the server has booted off the connection, or it didn't receive a ping in time
// in either case server side resources on both live and backup will be removed so the client
@@ -421,7 +422,7 @@
// --------------------------------------------------------------------------------------
private RemotingConnection getConnectionForCreateSession() throws MessagingException
- {
+ {
while (true)
{
RemotingConnection connection = getConnection(1);
@@ -430,12 +431,20 @@
{
// Connection is dead - failover/reconnect
boolean failedOver = failover();
-
+
if (!failedOver)
{
//Nothing we can do here
throw new MessagingException(MessagingException.NOT_CONNECTED, "Unabled to create session - server is unavailable and no backup server or backup is unavailable");
}
+
+ try
+ {
+ Thread.sleep(retryInterval);
+ }
+ catch (Exception ignore)
+ {
+ }
}
else
{
@@ -537,11 +546,6 @@
// First try reconnecting to current node if configured to do this
done = reconnect(maxRetriesBeforeFailover);
-
- if (done)
- {
- log.info("reconnected to original node");
- }
}
if (!done)
@@ -756,32 +760,34 @@
private RemotingConnection getConnection(final int count)
{
RemotingConnection conn;
-
-
-
+
if (connections.size() < maxConnections)
{
// Create a new one
DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
- Connector connector = connectorFactory.createConnector(transportParams, handler, this);
-
- connector.start();
-
+
+ Connector connector;
+
Connection tc;
try
{
+ connector = connectorFactory.createConnector(transportParams, handler, this);
+
+ connector.start();
+
tc = connector.createConnection();
}
catch (Exception e)
{
//Sanity catch for badly behaved remoting plugins
- log.warn("connector.create should never throw an exception, implementation is badly behaved");
+ log.warn("connector.create or connectorFactory.createConnector should never throw an exception, implementation is badly behaved, but we'll deal with it anyway.");
tc = null;
+
+ connector = null;
}
if (tc == null)
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -27,7 +27,6 @@
import org.jboss.messaging.core.config.cluster.ClusterConfiguration;
import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
import org.jboss.messaging.core.config.cluster.DivertConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
import org.jboss.messaging.core.config.cluster.QueueConfiguration;
import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.util.SimpleString;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -101,11 +101,11 @@
private final Transformer transformer;
- private final ClientSessionFactory csf;
+ private volatile ClientSessionFactory csf;
- private ClientSession session;
+ private volatile ClientSession session;
- private ClientProducer producer;
+ private volatile ClientProducer producer;
private volatile boolean started;
@@ -113,6 +113,18 @@
private final boolean useDuplicateDetection;
+ private volatile boolean active;
+
+ private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
+
+ private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final int maxRetriesBeforeFailover;
+
+ private final int maxRetriesAfterFailover;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -133,7 +145,7 @@
final long retryInterval,
final double retryIntervalMultiplier,
final int maxRetriesBeforeFailover,
- final int maxRetriesAfterFailover,
+ final int maxRetriesAfterFailover,
final boolean useDuplicateDetection) throws Exception
{
this.name = name;
@@ -164,14 +176,17 @@
this.transformer = transformer;
this.useDuplicateDetection = useDuplicateDetection;
+
+ this.connectorPair = connectorPair;
+
+ this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
+
+ this.maxRetriesAfterFailover = maxRetriesAfterFailover;
- this.csf = new ClientSessionFactoryImpl(connectorPair.a,
- connectorPair.b,
- retryInterval,
- retryIntervalMultiplier,
- maxRetriesBeforeFailover,
- maxRetriesAfterFailover);
-
if (maxBatchTime != -1)
{
future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
@@ -191,16 +206,47 @@
{
return;
}
+
+ executor.execute(new CreateObjectsRunnable());
- queue.addConsumer(this);
+ started = true;
+ }
- createTx();
-
- if (createObjects())
+ private class CreateObjectsRunnable implements Runnable
+ {
+ public synchronized void run()
{
- started = true;
+ try
+ {
+ createTx();
- queue.deliverAsync(executor);
+ queue.addConsumer(BridgeImpl.this);
+
+ csf = new ClientSessionFactoryImpl(connectorPair.a,
+ connectorPair.b,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetriesBeforeFailover,
+ maxRetriesAfterFailover);
+
+ session = csf.createSession(false, false, false);
+
+ producer = session.createProducer();
+
+ session.addFailureListener(BridgeImpl.this);
+
+ active = true;
+
+ queue.deliverAsync(executor);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to connect. Bridge is now disabled.", e);
+
+ active = false;
+
+ started = false;
+ }
}
}
@@ -208,6 +254,8 @@
{
started = false;
+ active = false;
+
queue.removeConsumer(this);
if (future != null)
@@ -227,13 +275,8 @@
{
log.warn("Timed out waiting for batch to be sent");
}
-
- if (session != null)
- {
- session.close();
- }
-
- started = false;
+
+ csf.close();
}
public boolean isStarted()
@@ -244,7 +287,14 @@
// For testing only
public RemotingConnection getForwardingConnection()
{
- return ((ClientSessionImpl)session).getConnection();
+ if (session == null)
+ {
+ return null;
+ }
+ else
+ {
+ return ((ClientSessionImpl)session).getConnection();
+ }
}
// Consumer implementation ---------------------------------------
@@ -263,7 +313,7 @@
synchronized (this)
{
- if (!started)
+ if (!active)
{
return HandleStatus.BUSY;
}
@@ -294,55 +344,43 @@
public synchronized boolean connectionFailed(final MessagingException me)
{
- // By the time this is called
- synchronized (this)
- {
- try
- {
- session.close();
+ fail();
- createObjects();
- }
- catch (Exception e)
- {
- log.error("Failed to reconnect", e);
- }
-
- return true;
- }
+ return true;
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private boolean createObjects() throws Exception
+
+ private void fail()
{
- try
+ if (!started)
{
- session = csf.createSession(false, false, false);
+ return;
}
- catch (MessagingException me)
+
+ log.warn("Bridge connection to target failed. Will try to reconnect");
+
+ try
{
- log.warn("Unable to connect. Message flow is now disabled.");
-
+ tx.rollback();
+
stop();
-
- return false;
}
+ catch (Exception e)
+ {
+ log.error("Failed to stop", e);
+ }
+
+ executor.execute(new CreateObjectsRunnable());
+ }
- session.addFailureListener(this);
+ // Package protected ---------------------------------------------
- producer = session.createProducer();
+ // Protected -----------------------------------------------------
- return true;
- }
+ // Private -------------------------------------------------------
private synchronized void timeoutBatch()
{
- if (!started)
+ if (!active)
{
return;
}
@@ -417,14 +455,7 @@
{
log.error("Failed to forward batch", e);
- try
- {
- tx.rollback();
- }
- catch (Exception e2)
- {
- log.error("Failed to rollback", e2);
- }
+ fail();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -267,193 +267,25 @@
group.start();
}
- // private synchronized void deployMessageFlow(final MessageFlowConfiguration config) throws Exception
- // {
- // if (config.getName() == null)
- // {
- // log.warn("Must specify a unique name for each message flow. This one will not be deployed.");
- //
- // return;
- // }
- //
- // if (config.getAddress() == null)
- // {
- // log.warn("Must specify an address each message flow. This one will not be deployed.");
- //
- // return;
- // }
- //
- // if (messageFlows.containsKey(config.getName()))
- // {
- // log.warn("There is already a message-flow with name " + config.getName() +
- // " deployed. This one will not be deployed.");
- //
- // return;
- // }
- //
- // if (config.getMaxBatchTime() == 0 || config.getMaxBatchTime() < -1)
- // {
- // log.warn("Invalid value for max-batch-time. Valid values are -1 or > 0");
- //
- // return;
- // }
- //
- // if (config.getMaxBatchSize() < 1)
- // {
- // log.warn("Invalid value for max-batch-size. Valid values are > 0");
- //
- // return;
- // }
- //
- // Transformer transformer = null;
- //
- // if (config.getTransformerClassName() != null)
- // {
- // ClassLoader loader = Thread.currentThread().getContextClassLoader();
- // try
- // {
- // Class<?> clz = loader.loadClass(config.getTransformerClassName());
- // transformer = (Transformer)clz.newInstance();
- // }
- // catch (Exception e)
- // {
- // throw new IllegalArgumentException("Error instantiating transformer class \"" + config.getTransformerClassName() +
- // "\"",
- // e);
- // }
- // }
- //
- // SimpleString flowName = new SimpleString(config.getName());
- //
- // List<LinkInfo> linkInfos = linkInfoMap.get(flowName);
- //
- // MessageFlow flow;
- //
- // if (config.getDiscoveryGroupName() == null)
- // {
- // // Create message flow with list of static connectors
- //
- // List<Pair<TransportConfiguration, TransportConfiguration>> conns = new ArrayList<Pair<TransportConfiguration,
- // TransportConfiguration>>();
- //
- // for (Pair<String, String> connectorNamePair : config.getConnectorNamePairs())
- // {
- // TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorNamePair.a);
- //
- // if (connector == null)
- // {
- // log.warn("No connector defined with name '" + connectorNamePair.a +
- // "'. The message flow will not be deployed.");
- //
- // return;
- // }
- //
- // TransportConfiguration backupConnector = null;
- //
- // if (connectorNamePair.b != null)
- // {
- // backupConnector = configuration.getConnectorConfigurations().get(connectorNamePair.b);
- //
- // if (backupConnector == null)
- // {
- // log.warn("No connector defined with name '" + connectorNamePair.b +
- // "'. The message flow will not be deployed.");
- //
- // return;
- // }
- // }
- //
- // conns.add(new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector));
- // }
- //
- // flow = new MessageFlowImpl(flowName,
- // new SimpleString(config.getAddress()),
- // new SimpleString(config.getForwardingAddress()),
- // config.getMaxBatchSize(),
- // config.getMaxBatchTime(),
- // config.getFilterString() == null ? null
- // : new SimpleString(config.getFilterString()),
- // executorFactory,
- // storageManager,
- // postOffice,
- // queueSettingsRepository,
- // scheduledExecutor,
- // transformer,
- // config.getRetryInterval(),
- // config.getRetryIntervalMultiplier(),
- // config.getMaxRetriesBeforeFailover(),
- // config.getMaxRetriesAfterFailover(),
- // config.isUseDuplicateDetection(),
- // config.getMaxHops(),
- // config.isUseRemoteQueueInformation(),
- // linkInfos,
- // conns,
- // queueFactory);
- // }
- // else
- // {
- // // Create message flow with connectors from discovery group
- //
- // DiscoveryGroup group = discoveryGroups.get(config.getDiscoveryGroupName());
- //
- // if (group == null)
- // {
- // log.warn("There is no discovery-group with name " + config.getDiscoveryGroupName() +
- // " deployed. This one will not be deployed.");
- //
- // return;
- // }
- //
- // flow = new MessageFlowImpl(flowName,
- // new SimpleString(config.getAddress()),
- // new SimpleString(config.getForwardingAddress()),
- // config.getMaxBatchSize(),
- // config.getMaxBatchTime(),
- // config.getFilterString() == null ? null
- // : new SimpleString(config.getFilterString()),
- // this.executorFactory,
- // storageManager,
- // postOffice,
- // queueSettingsRepository,
- // scheduledExecutor,
- // transformer,
- // config.getRetryInterval(),
- // config.getRetryIntervalMultiplier(),
- // config.getMaxRetriesBeforeFailover(),
- // config.getMaxRetriesAfterFailover(),
- // config.isUseDuplicateDetection(),
- // config.getMaxHops(),
- // config.isUseRemoteQueueInformation(),
- // linkInfos,
- // group,
- // queueFactory);
- // }
- //
- // messageFlows.put(config.getName(), flow);
- // managementService.registerMessageFlow(flow, config);
- //
- // flow.start();
- // }
-
private synchronized void deployBridge(final BridgeConfiguration config) throws Exception
{
if (config.getName() == null)
{
- log.warn("Must specify a unique name for each message flow. This one will not be deployed.");
+ log.warn("Must specify a unique name for each bridge. This one will not be deployed.");
return;
}
if (config.getQueueName() == null)
{
- log.warn("Must specify a queue name for each message flow. This one will not be deployed.");
+ log.warn("Must specify a queue name for each bridge. This one will not be deployed.");
return;
}
if (config.getForwardingAddress() == null)
{
- log.warn("Must specify an forwarding address each message flow. This one will not be deployed.");
+ log.warn("Must specify an forwarding address each bridge. This one will not be deployed.");
return;
}
@@ -533,54 +365,23 @@
config.getMaxBatchSize(),
config.getMaxBatchTime(),
config.getFilterString() == null ? null : new SimpleString(config.getFilterString()),
- new SimpleString(config.getForwardingAddress()),
+ new SimpleString(config.getForwardingAddress()),
storageManager,
scheduledExecutor,
transformer,
config.getRetryInterval(),
config.getRetryIntervalMultiplier(),
config.getMaxRetriesBeforeFailover(),
- config.getMaxRetriesAfterFailover(),
+ config.getMaxRetriesAfterFailover(),
config.isUseDuplicateDetection());
+ log.info("put bridge " + this);
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
bridge.start();
}
- // else
- // {
- // DiscoveryGroup group = discoveryGroups.get(config.getDiscoveryGroupName());
- //
- // if (group == null)
- // {
- // log.warn("There is no discovery-group with name " + config.getDiscoveryGroupName() +
- // " deployed. This one will not be deployed.");
- //
- // return;
- // }
- //
- // bridge = new BridgeImpl(new SimpleString(config.getName()),
- // queue,
- // config.getDiscoveryGroupName(),
- // executorFactory.getExecutor(),
- // config.getMaxBatchSize(),
- // config.getMaxBatchTime(),
- // new SimpleString(config.getForwardingAddress()),
- // config.getFilterString() == null ? null
- // : new SimpleString(config.getFilterString()),
- // storageManager,
- // scheduledExecutor,
- // transformer,
- // config.getRetryInterval(),
- // config.getRetryIntervalMultiplier(),
- // config.getMaxRetriesBeforeFailover(),
- // config.getMaxRetriesAfterFailover(),
- // config.getMaxHops(),
- // config.isUseDuplicateDetection());
- // }
-
}
private Transformer instantiateTransformer(final String transformerClassName)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -215,11 +215,6 @@
throw new IllegalStateException("StorageManager must be started before MessagingServer is started");
}
- if (!remotingService.isStarted())
- {
- throw new IllegalStateException("RemotingService must be started before MessagingServer is started");
- }
-
managementService.start();
// The rest of the components are not pluggable and created and started
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServiceImpl.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -152,9 +152,11 @@
public void start() throws Exception
{
- storageManager.start();
+ storageManager.start();
+ server.start();
+ //Remoting service should always be started last, otherwise create session packets can be received before the message server packet handler has been registered
+ //resulting in create session attempts to "hang" since response will never be sent back.
remotingService.start();
- server.start();
}
public void stop() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -153,10 +153,10 @@
log.info("Simulating failure");
- // Now we will simulate a failure of the message flow connection between server1 and server2
+ // Now we will simulate a failure of the bridge connection between server1 and server2
// And prevent reconnection for a few tries, then it will reconnect without failing over
Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
@@ -286,10 +286,10 @@
log.info("Simulating failure");
- // Now we will simulate a failure of the message flow connection between server1 and server2
+ // Now we will simulate a failure of the bridge connection between server1 and server2
// And prevent reconnection for a few tries, then it will reconnect without failing over
Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
@@ -325,6 +325,26 @@
assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
}
+
+ private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ do
+ {
+ RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+
+ if (forwardingConnection != null)
+ {
+ return forwardingConnection;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < 50000);
+
+ throw new IllegalStateException("Failed to get forwarding connection");
+ }
public void testFailoverThenReconnectAfterFailover() throws Exception
{
@@ -419,16 +439,16 @@
log.info("Simulating failure");
- // Now we will simulate a failure of the message flow connection between server1 and server2
+ // Now we will simulate a failure of the bridge connection between server1 and server2
// And prevent reconnection for a few tries, then it will reconnect without failing over
Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
// Now we should be failed over so fail again and should reconnect
- forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ forwardingConnection = getForwardingConnection(bridge);
InVMConnector.resetFailures();
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
@@ -545,10 +565,10 @@
log.info("Simulating failure");
- // Now we will simulate a failure of the message flow connection between server1 and server2
+ // Now we will simulate a failure of the bridge connection between server1 and server2
// And prevent reconnection for a few tries, then it will reconnect without failing over
Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover - 1;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
@@ -661,10 +681,10 @@
log.info("Simulating failure");
- // Now we will simulate a failure of the message flow connection between server1 and server2
+ // Now we will simulate a failure of the bridge connection between server1 and server2
// And prevent reconnection for a few tries, then it will reconnect without failing over
Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
- RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+ RemotingConnection forwardingConnection = getForwardingConnection(bridge);
InVMConnector.failOnCreateConnection = true;
InVMConnector.numberOfFailures = retriesBeforeFailover * 2;
forwardingConnection.fail(new MessagingException(MessagingException.NOT_CONNECTED));
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeStartTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/bridge/BridgeStartTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -0,0 +1,701 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.bridge;
+
+import static org.jboss.messaging.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.BridgeConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.cluster.Bridge;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A BridgeStartTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 14 Jan 2009 14:05:01
+ *
+ *
+ */
+public class BridgeStartTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(BridgeStartTest.class);
+
+ public void testStartStop() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(SERVER_ID_PROP_NAME, 1);
+ MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+ final String bridgeName = "bridge1";
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ 1,
+ -1,
+ null,
+ 1000,
+ 1d,
+ 0,
+ 0,
+ false,
+ connectorPair);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ service0.getServer().getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ service0.getServer().getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ service1.getServer().getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ service1.start();
+ service0.start();
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
+
+ bridge.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ assertNull(consumer1.receive(500));
+
+ bridge.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ service0.stop();
+
+ service1.stop();
+ }
+
+ public void testTargetServerUpAndDown() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(SERVER_ID_PROP_NAME, 1);
+ MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+ final String bridgeName = "bridge1";
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ 1,
+ -1,
+ null,
+ 1000,
+ 1d,
+ -1,
+ -1,
+ false,
+ connectorPair);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ service0.getServer().getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ service0.getServer().getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ service1.getServer().getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ //Don't start service 1 yet
+
+ log.info("starting 0");
+ service0.start();
+ log.info("started 0");
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ //Wait a bit
+ Thread.sleep(1000);
+
+ log.info("starting 1");
+ service1.start();
+ log.info("started server 1");
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ log.info("consumed messages");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session1.close();
+
+ sf1.close();
+
+ log.info("stipping 1");
+ service1.stop();
+ log.info("stopped 1");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ log.info("Sent more messages");
+
+ service1.start();
+
+ log.info("started service1");
+
+ sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ session1 = sf1.createSession(false, true, true);
+
+ consumer1 = session1.createConsumer(queueName1);
+
+ log.info("**** started session");
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ log.info("received all messages again");
+
+ session1.close();
+
+ sf1.close();
+
+ session0.close();
+
+ sf0.close();
+
+ service0.stop();
+
+ service1.stop();
+ }
+
+ public void testTargetServerNotAvailableNoReconnectTries() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(SERVER_ID_PROP_NAME, 1);
+ MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+ final String bridgeName = "bridge1";
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ 1,
+ -1,
+ null,
+ 1000,
+ 1d,
+ 0,
+ 0,
+ false,
+ connectorPair);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ service0.getServer().getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ service0.getServer().getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ service1.getServer().getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ //Don't start service 1 yet
+
+ log.info("starting 0");
+ service0.start();
+ log.info("started 0");
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ //Wait a bit
+ Thread.sleep(1000);
+
+ //Bridge should be stopped since retries = 0
+
+ log.info("starting 1");
+ service1.start();
+ log.info("started server 1");
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ //Won't be received since the bridge was deactivated
+ assertNull(consumer1.receive(200));
+
+ //Now start the bridge manually
+
+ Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
+
+ bridge.start();
+
+ //Messages should now be received
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session1.close();
+
+ sf1.close();
+
+ session0.close();
+
+ sf0.close();
+
+ service0.stop();
+
+ service1.stop();
+ }
+
+ public void testManualStopStart() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(SERVER_ID_PROP_NAME, 1);
+ MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+ final String bridgeName = "bridge1";
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ 1,
+ -1,
+ null,
+ 1000,
+ 1d,
+ 0,
+ 0,
+ false,
+ connectorPair);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ service0.getServer().getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ service0.getServer().getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ service1.getServer().getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ service1.start();
+
+ service0.start();
+
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ //Now stop the bridge manually
+
+ Bridge bridge = service0.getServer().getClusterManager().getBridges().get(bridgeName);
+
+ bridge.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ assertNull(consumer1.receive(200));
+
+ bridge.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ bridge.stop();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ assertNull(consumer1.receive(200));
+
+ bridge.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNotNull(message);
+
+ assertEquals((Integer)i, (Integer)message.getProperty(propKey));
+
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receive(200));
+
+ session1.close();
+
+ sf1.close();
+
+ session0.close();
+
+ sf0.close();
+
+ service0.stop();
+
+ service1.stop();
+ }
+
+
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/BasicMessageFlowTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -1,337 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.tests.integration.cluster.bridge.BridgeTestBase;
-
-/**
- *
- * A ActivationTimeoutTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 4 Nov 2008 16:54:50
- *
- *
- */
-public class BasicMessageFlowTest extends BridgeTestBase
-{
-// private static final Logger log = Logger.getLogger(BasicMessageFlowTest.class);
-//
-// // Constants -----------------------------------------------------
-//
-// // Attributes ----------------------------------------------------
-//
-// // Static --------------------------------------------------------
-//
-// // Constructors --------------------------------------------------
-//
-// // Public --------------------------------------------------------
-//
-// public void testMessageFlowsSameName() throws Exception
-// {
-// Map<String, Object> service0Params = new HashMap<String, Object>();
-// MessagingService service0 = createMessagingService(0, service0Params);
-//
-// Map<String, Object> service1Params = new HashMap<String, Object>();
-// MessagingService service1 = createMessagingService(1, service1Params);
-// service1.start();
-//
-// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service0Params);
-//
-// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-//
-// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service1Params);
-// connectors.put(server1tc.getName(), server1tc);
-// service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
-//
-// List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
-// connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
-//
-// final SimpleString address1 = new SimpleString("testaddress");
-//
-// MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1",
-// address1.toString(),
-// "car='saab'",
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
-// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// DEFAULT_MAX_HOPS,
-// connectorNames);
-// MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow1",
-// address1.toString(),
-// "car='bmw'",
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
-// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// DEFAULT_MAX_HOPS,
-// connectorNames);
-//
-// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-// ofconfigs.add(ofconfig1);
-// ofconfigs.add(ofconfig2);
-//
-// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-//
-// // Only one of the flows should be deployed
-// service0.start();
-//
-// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-// ClientSession session0 = csf0.createSession(false, true, true);
-//
-// ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-// ClientSession session1 = csf1.createSession(false, true, true);
-//
-// session0.createQueue(address1, address1, null, false, false);
-// session1.createQueue(address1, address1, null, false, false);
-// ClientProducer prod0 = session0.createProducer(address1);
-//
-// ClientConsumer cons1 = session1.createConsumer(address1);
-//
-// session1.start();
-//
-// SimpleString propKey = new SimpleString("car");
-//
-// ClientMessage messageSaab = session0.createClientMessage(false);
-// messageSaab.putStringProperty(propKey, new SimpleString("saab"));
-// messageSaab.getBody().flip();
-//
-// ClientMessage messageBMW = session0.createClientMessage(false);
-// messageBMW.putStringProperty(propKey, new SimpleString("bmw"));
-// messageBMW.getBody().flip();
-//
-// prod0.send(messageSaab);
-// prod0.send(messageBMW);
-//
-// ClientMessage r1 = cons1.receive(1000);
-// assertNotNull(r1);
-//
-// SimpleString val = (SimpleString)r1.getProperty(propKey);
-// assertTrue(val.equals(new SimpleString("saab")) || val.equals(new SimpleString("bmw")));
-// r1 = cons1.receiveImmediate();
-// assertNull(r1);
-//
-// session0.close();
-// session1.close();
-//
-// service0.stop();
-// service1.stop();
-//
-// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-// }
-//
-// public void testMessageNullName() throws Exception
-// {
-// Map<String, Object> service0Params = new HashMap<String, Object>();
-// MessagingService service0 = createMessagingService(0, service0Params);
-//
-// Map<String, Object> service1Params = new HashMap<String, Object>();
-// MessagingService service1 = createMessagingService(1, service1Params);
-// service1.start();
-//
-// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service0Params);
-//
-// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-//
-// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service1Params);
-// connectors.put(server1tc.getName(), server1tc);
-// service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
-//
-// List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
-// connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
-//
-// final SimpleString address1 = new SimpleString("testaddress");
-//
-// MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration(null,
-// address1.toString(),
-// null,
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
-// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// DEFAULT_MAX_HOPS,
-// connectorNames);
-//
-// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-// ofconfigs.add(ofconfig1);
-//
-// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-//
-// service0.start();
-//
-// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-// ClientSession session0 = csf0.createSession(false, true, true);
-//
-// ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-// ClientSession session1 = csf1.createSession(false, true, true);
-//
-// session0.createQueue(address1, address1, null, false, false);
-// session1.createQueue(address1, address1, null, false, false);
-// ClientProducer prod0 = session0.createProducer(address1);
-//
-// ClientConsumer cons1 = session1.createConsumer(address1);
-//
-// session1.start();
-//
-// SimpleString propKey = new SimpleString("car");
-//
-// ClientMessage message = session0.createClientMessage(false);
-// message.getBody().flip();
-//
-// prod0.send(message);
-//
-// ClientMessage r1 = cons1.receive(1000);
-// assertNull(r1);
-//
-// session0.close();
-// session1.close();
-//
-// service0.stop();
-// service1.stop();
-//
-// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-// }
-//
-// public void testMessageNullAdress() throws Exception
-// {
-// Map<String, Object> service0Params = new HashMap<String, Object>();
-// MessagingService service0 = createMessagingService(0, service0Params);
-//
-// Map<String, Object> service1Params = new HashMap<String, Object>();
-// MessagingService service1 = createMessagingService(1, service1Params);
-// service1.start();
-//
-// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service0Params);
-//
-// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
-//
-// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service1Params);
-// connectors.put(server1tc.getName(), server1tc);
-// service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
-//
-// List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
-// connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
-//
-// final SimpleString address1 = new SimpleString("testaddress");
-//
-// MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("blah",
-// null,
-// null,
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
-// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// DEFAULT_MAX_HOPS,
-// connectorNames);
-//
-// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-// ofconfigs.add(ofconfig1);
-//
-// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-//
-// service0.start();
-//
-// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-// ClientSession session0 = csf0.createSession(false, true, true);
-//
-// ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
-// ClientSession session1 = csf1.createSession(false, true, true);
-//
-// session0.createQueue(address1, address1, null, false, false);
-// session1.createQueue(address1, address1, null, false, false);
-// ClientProducer prod0 = session0.createProducer(address1);
-//
-// ClientConsumer cons1 = session1.createConsumer(address1);
-//
-// session1.start();
-//
-// SimpleString propKey = new SimpleString("car");
-//
-// ClientMessage message = session0.createClientMessage(false);
-// message.getBody().flip();
-//
-// prod0.send(message);
-//
-// ClientMessage r1 = cons1.receive(1000);
-// assertNull(r1);
-//
-// session0.close();
-// session1.close();
-//
-// service0.stop();
-// service1.stop();
-//
-// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-// }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -1,811 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.core.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.cluster.DiscoveryListener;
-import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.cluster.BroadcastGroup;
-import org.jboss.messaging.core.server.cluster.impl.BroadcastGroupImpl;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.UUIDGenerator;
-
-/**
- * A DiscoveryTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 8 Dec 2008 12:36:26
- *
- *
- *
- *
- */
-public class DiscoveryTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(DiscoveryTest.class);
-
- public void testSimpleBroadcast() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- TransportConfiguration backup1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- bg.addConnectorPair(connectorPair);
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- dg.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg.waitForBroadcast(1000);
-
- assertTrue(ok);
-
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
-
- assertNotNull(connectors);
-
- assertEquals(1, connectors.size());
-
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
-
- assertEquals(connectorPair, receivedPair);
-
- bg.stop();
-
- dg.stop();
-
- }
-
- public void testSimpleBroadcastDifferentAddress() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- TransportConfiguration backup1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- bg.addConnectorPair(connectorPair);
-
- final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort, timeout);
-
- dg.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg.waitForBroadcast(1000);
-
- assertFalse(ok);
-
- bg.stop();
-
- dg.stop();
- }
-
- public void testSimpleBroadcastDifferentPort() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- TransportConfiguration backup1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- bg.addConnectorPair(connectorPair);
-
- final int port2 = 6746;
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, port2, timeout);
-
- dg.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg.waitForBroadcast(1000);
-
- assertFalse(ok);
-
- bg.stop();
-
- dg.stop();
- }
-
- public void testSimpleBroadcastDifferentAddressAndPort() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- TransportConfiguration backup1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- bg.addConnectorPair(connectorPair);
-
- final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
- final int port2 = 6746;
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, port2, timeout);
-
- dg.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg.waitForBroadcast(1000);
-
- assertFalse(ok);
-
- bg.stop();
-
- dg.stop();
- }
-
- public void testMultipleGroups() throws Exception
- {
- final InetAddress groupAddress1 = InetAddress.getByName("230.1.2.3");
- final int groupPort1 = 6745;
-
- final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
- final int groupPort2 = 6746;
-
- final InetAddress groupAddress3 = InetAddress.getByName("230.1.2.5");
- final int groupPort3 = 6747;
-
- final int timeout = 500;
-
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress1, groupPort1);
- bg1.start();
-
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress2, groupPort2);
- bg2.start();
-
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress3, groupPort3);
- bg3.start();
-
- TransportConfiguration live1 = generateTC();
- TransportConfiguration backup1 = generateTC();
-
- TransportConfiguration live2 = generateTC();
- TransportConfiguration backup2 = generateTC();
-
- TransportConfiguration live3 = generateTC();
- TransportConfiguration backup3 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair1 =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
- new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair3 =
- new Pair<TransportConfiguration, TransportConfiguration>(live3, backup3);
-
- bg1.addConnectorPair(connectorPair1);
- bg2.addConnectorPair(connectorPair2);
- bg3.addConnectorPair(connectorPair3);
-
- DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress1, groupPort1, timeout);
- dg1.start();
-
- DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort2, timeout);
- dg2.start();
-
- DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress3, groupPort3, timeout);
- dg3.start();
-
- bg1.broadcastConnectors();
-
- bg2.broadcastConnectors();
-
- bg3.broadcastConnectors();
-
- boolean ok = dg1.waitForBroadcast(1000);
- assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
- assertEquals(connectorPair1, receivedPair);
-
- ok = dg2.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- receivedPair = connectors.get(0);
- assertEquals(connectorPair2, receivedPair);
-
- ok = dg3.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- receivedPair = connectors.get(0);
- assertEquals(connectorPair3, receivedPair);
-
- bg1.stop();
- bg2.stop();
- bg3.stop();
-
- dg1.stop();
- dg2.stop();
- dg3.stop();
- }
-
- public void testBroadcastNullBackup() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, null);
-
- bg.addConnectorPair(connectorPair);
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- dg.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg.waitForBroadcast(1000);
-
- assertTrue(ok);
-
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
-
- assertNotNull(connectors);
-
- assertEquals(1, connectors.size());
-
- Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
-
- assertEquals(connectorPair, receivedPair);
-
- bg.stop();
-
- dg.stop();
-
- }
-
- public void testDiscoveryListenersCalled() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, null);
-
- bg.addConnectorPair(connectorPair);
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- MyListener listener1 = new MyListener();
- MyListener listener2 = new MyListener();
- MyListener listener3 = new MyListener();
-
- dg.registerListener(listener1);
- dg.registerListener(listener2);
- dg.registerListener(listener3);
-
- dg.start();
-
- bg.broadcastConnectors();
- boolean ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- assertTrue(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- //Won't be called since connectors haven't changed
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- assertFalse(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- TransportConfiguration live2 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
- new Pair<TransportConfiguration, TransportConfiguration>(live2, null);
-
- bg.addConnectorPair(connectorPair2);
-
- dg.unregisterListener(listener1);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertTrue(listener2.called);
- assertTrue(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- dg.unregisterListener(listener2);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- assertFalse(listener3.called);
-
- listener1.called = false;
- listener2.called = false;
- listener3.called = false;
-
- TransportConfiguration live4 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair4 =
- new Pair<TransportConfiguration, TransportConfiguration>(live4, null);
-
- bg.addConnectorPair(connectorPair4);
-
- dg.unregisterListener(listener3);
-
- bg.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
-
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- assertFalse(listener3.called);
-
- bg.stop();
-
- dg.stop();
-
- }
-
- public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
- bg1.start();
-
- BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
- bg2.start();
-
- BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
- bg3.start();
-
- TransportConfiguration live1 = generateTC();
- TransportConfiguration backup1 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair1 =
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
- bg1.addConnectorPair(connectorPair1);
-
- TransportConfiguration live2 = generateTC();
- TransportConfiguration backup2 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
- new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
- bg2.addConnectorPair(connectorPair2);
-
- TransportConfiguration live3 = generateTC();
- TransportConfiguration backup3 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair3 =
- new Pair<TransportConfiguration, TransportConfiguration>(live3, backup3);
- bg3.addConnectorPair(connectorPair3);
-
- DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- MyListener listener1 = new MyListener();
- dg.registerListener(listener1);
- MyListener listener2 = new MyListener();
- dg.registerListener(listener2);
-
- dg.start();
-
- bg1.broadcastConnectors();
- boolean ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- TransportConfiguration live1_1 = generateTC();
- TransportConfiguration backup1_1 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair1_1 =
- new Pair<TransportConfiguration, TransportConfiguration>(live1_1, backup1_1);
- bg1.addConnectorPair(connectorPair1_1);
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(4, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg2.removeConnectorPair(connectorPair2);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(4, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- //Connector2 should still be there since not timed out yet
- assertTrue(connectors.contains(connectorPair2));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
- assertFalse(listener1.called);
- assertFalse(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- Thread.sleep(timeout);
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
-
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(3, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair3));
- assertTrue(connectors.contains(connectorPair1_1));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg1.removeConnectorPair(connectorPair1);
- bg3.removeConnectorPair(connectorPair3);
-
- Thread.sleep(timeout);
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
-
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1_1));
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg1.removeConnectorPair(connectorPair1_1);
-
- Thread.sleep(timeout);
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
-
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(0, connectors.size());
- assertTrue(listener1.called);
- assertTrue(listener2.called);
- listener1.called = false;
- listener2.called = false;
-
- bg1.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg2.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
- bg3.broadcastConnectors();
- ok = dg.waitForBroadcast(1000);
-
- connectors = dg.getConnectors();
- assertNotNull(connectors);
- assertEquals(0, connectors.size());
- assertFalse(listener1.called);
- assertFalse(listener2.called);
-
- bg1.stop();
- bg2.stop();
- bg3.stop();
-
- dg.stop();
- }
-
- public void testMultipleDiscoveryGroups() throws Exception
- {
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
- final int timeout = 500;
-
- BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
-
- bg.start();
-
- TransportConfiguration live1 = generateTC();
- TransportConfiguration backup1 = generateTC();
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair1=
- new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
-
- bg.addConnectorPair(connectorPair1);
-
- DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
-
- dg1.start();
- dg2.start();
- dg3.start();
-
- bg.broadcastConnectors();
-
- boolean ok = dg1.waitForBroadcast(1000);
- assertTrue(ok);
- List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
-
- ok = dg2.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- ok = dg3.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(1, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
-
- TransportConfiguration live2 = generateTC();
- TransportConfiguration backup2 = generateTC();
- Pair<TransportConfiguration, TransportConfiguration> connectorPair2=
- new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
-
- bg.addConnectorPair(connectorPair2);
-
- bg.broadcastConnectors();
- ok = dg1.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg1.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
- ok = dg2.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg2.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
- ok = dg3.waitForBroadcast(1000);
- assertTrue(ok);
- connectors = dg3.getConnectors();
- assertNotNull(connectors);
- assertEquals(2, connectors.size());
- assertTrue(connectors.contains(connectorPair1));
- assertTrue(connectors.contains(connectorPair2));
-
- bg.stop();
-
- dg1.stop();
- dg2.stop();
- dg3.stop();
- }
-
- private TransportConfiguration generateTC()
- {
- String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
- String name = UUIDGenerator.getInstance().generateStringUUID();
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
- params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
- params.put(UUIDGenerator.getInstance().generateStringUUID(), 721633.123d);
- TransportConfiguration tc = new TransportConfiguration(className, params, name);
- return tc;
- }
-
- private static class MyListener implements DiscoveryListener
- {
- volatile boolean called;
-
- public void connectorsChanged()
- {
- called = true;
- }
- }
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MaxHopsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MaxHopsTest.java 2009-01-21 07:27:06 UTC (rev 5672)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MaxHopsTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -1,450 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.distribution;
-
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * A MaxHopsTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 24 Nov 2008 14:26:45
- *
- *
- */
-public class MaxHopsTest extends ServiceTestBase
-{
- private static final Logger log = Logger.getLogger(MaxHopsTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testFoo()
- {
- }
-
-
-//
-// public void testHops() throws Exception
-// {
-// testHops(0, false);
-// testHops(1, false);
-// testHops(2, false);
-// testHops(3, false);
-// testHops(4, true);
-// testHops(5, true);
-// testHops(6, true);
-// testHops(-1, true);
-// }
-//
-// public void testHopsFanout() throws Exception
-// {
-// testHopsFanout(0, false);
-// testHopsFanout(1, false);
-// testHopsFanout(2, true);
-// testHopsFanout(3, true);
-// testHopsFanout(4, true);
-// testHopsFanout(-1, true);
-// }
-//
-// private void testHops(final int maxHops, final boolean shouldReceive) throws Exception
-// {
-// Map<String, Object> service0Params = new HashMap<String, Object>();
-// MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
-//
-// Map<String, Object> service1Params = new HashMap<String, Object>();
-// service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-// MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
-//
-// Map<String, Object> service2Params = new HashMap<String, Object>();
-// service2Params.put(TransportConstants.SERVER_ID_PROP_NAME, 2);
-// MessagingService service2 = createClusteredServiceWithParams(2, false, service2Params);
-//
-// Map<String, Object> service3Params = new HashMap<String, Object>();
-// service3Params.put(TransportConstants.SERVER_ID_PROP_NAME, 3);
-// MessagingService service3 = createClusteredServiceWithParams(3, false, service3Params);
-//
-// Map<String, Object> service4Params = new HashMap<String, Object>();
-// service4Params.put(TransportConstants.SERVER_ID_PROP_NAME, 4);
-// MessagingService service4 = createClusteredServiceWithParams(4, false, service4Params);
-//
-// Map<String, TransportConfiguration> connectors0 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service1Params,
-// "connector1");
-// connectors0.put(server1tc.getName(), server1tc);
-// service0.getServer().getConfiguration().setConnectorConfigurations(connectors0);
-//
-// Map<String, TransportConfiguration> connectors1 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service2Params,
-// "connector1");
-// connectors1.put(server2tc.getName(), server2tc);
-// service1.getServer().getConfiguration().setConnectorConfigurations(connectors1);
-//
-// Map<String, TransportConfiguration> connectors2 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service3Params,
-// "connector1");
-// connectors2.put(server3tc.getName(), server3tc);
-// service2.getServer().getConfiguration().setConnectorConfigurations(connectors2);
-//
-// Map<String, TransportConfiguration> connectors3 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service4Params,
-// "connector1");
-// connectors3.put(server4tc.getName(), server4tc);
-// service3.getServer().getConfiguration().setConnectorConfigurations(connectors3);
-//
-// List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
-// connectorNames.add(new Pair<String, String>("connector1", null));
-//
-// final SimpleString testAddress = new SimpleString("testaddress");
-//
-// MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
-// testAddress.toString(),
-// null,
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// 0,
-// 0,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// maxHops,
-// connectorNames);
-//
-// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
-// ofconfigs.add(ofconfig);
-//
-// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-// service1.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-// service2.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-// service3.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
-//
-// service4.start();
-// service3.start();
-// service2.start();
-// service1.start();
-// service0.start();
-//
-// log.info("started service");
-//
-// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service0Params);
-//
-// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-// ClientSession session0 = csf0.createSession(false, true, true);
-//
-// ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
-// ClientSession session4 = csf4.createSession(false, true, true);
-// session4.createQueue(testAddress, testAddress, null, false, true);
-//
-// ClientProducer prod0 = session0.createProducer(testAddress);
-//
-// ClientConsumer cons4 = session4.createConsumer(testAddress);
-//
-// session4.start();
-//
-// final int numMessages = 10;
-//
-// final SimpleString propKey = new SimpleString("testkey");
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = session0.createClientMessage(true);
-// message.putIntProperty(propKey, i);
-// message.getBody().flip();
-//
-// prod0.send(message);
-// }
-//
-// log.info("sent messages");
-//
-// if (shouldReceive)
-// {
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage rmessage = cons4.receive(5000);
-// assertNotNull(rmessage);
-// assertEquals(i, rmessage.getProperty(propKey));
-// }
-// }
-//
-// ClientMessage rmessage = cons4.receive(1000);
-//
-// assertNull(rmessage);
-//
-// service0.stop();
-// service1.stop();
-// service2.stop();
-// service3.stop();
-// service4.stop();
-//
-// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
-// }
-//
-// private void testHopsFanout(final int maxHops, final boolean shouldReceive) throws Exception
-// {
-// Map<String, Object> service0Params = new HashMap<String, Object>();
-// MessagingService service0 = createClusteredServiceWithParams(0, false, service0Params);
-//
-// Map<String, Object> service1Params = new HashMap<String, Object>();
-// service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-// MessagingService service1 = createClusteredServiceWithParams(1, false, service1Params);
-//
-// Map<String, Object> service2Params = new HashMap<String, Object>();
-// service2Params.put(TransportConstants.SERVER_ID_PROP_NAME, 2);
-// MessagingService service2 = createClusteredServiceWithParams(2, false, service2Params);
-//
-// Map<String, Object> service3Params = new HashMap<String, Object>();
-// service3Params.put(TransportConstants.SERVER_ID_PROP_NAME, 3);
-// MessagingService service3 = createClusteredServiceWithParams(3, false, service3Params);
-//
-// Map<String, Object> service4Params = new HashMap<String, Object>();
-// service4Params.put(TransportConstants.SERVER_ID_PROP_NAME, 4);
-// MessagingService service4 = createClusteredServiceWithParams(4, false, service4Params);
-//
-// Map<String, TransportConfiguration> connectors0 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service1Params,
-// "connector1");
-// connectors0.put(server1tc.getName(), server1tc);
-// service0.getServer().getConfiguration().setConnectorConfigurations(connectors0);
-//
-// Map<String, TransportConfiguration> connectors1 = new HashMap<String, TransportConfiguration>();
-// TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service2Params,
-// "connector1");
-//
-// TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service3Params,
-// "connector2");
-//
-// TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service4Params,
-// "connector3");
-// connectors1.put(server2tc.getName(), server2tc);
-// connectors1.put(server3tc.getName(), server3tc);
-// connectors1.put(server4tc.getName(), server4tc);
-//
-// service1.getServer().getConfiguration().setConnectorConfigurations(connectors1);
-//
-// List<Pair<String, String>> connectorNames1 = new ArrayList<Pair<String, String>>();
-// connectorNames1.add(new Pair<String, String>("connector1", null));
-//
-// final SimpleString testAddress = new SimpleString("testaddress");
-//
-// MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("outflow1",
-// testAddress.toString(),
-// null,
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// 0,
-// 0,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// maxHops,
-// connectorNames1);
-//
-// Set<MessageFlowConfiguration> ofconfigs1 = new HashSet<MessageFlowConfiguration>();
-// ofconfigs1.add(ofconfig1);
-//
-// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs1);
-//
-// List<Pair<String, String>> connectorNames2 = new ArrayList<Pair<String, String>>();
-// connectorNames2.add(new Pair<String, String>("connector1", null));
-// connectorNames2.add(new Pair<String, String>("connector2", null));
-// connectorNames2.add(new Pair<String, String>("connector3", null));
-//
-// MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("outflow2",
-// testAddress.toString(),
-// null,
-// false,
-// 1,
-// -1,
-// null,
-// DEFAULT_RETRY_INTERVAL,
-// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
-// 0,
-// 0,
-// DEFAULT_USE_DUPLICATE_DETECTION,
-// maxHops,
-// connectorNames2);
-//
-// Set<MessageFlowConfiguration> ofconfigs2 = new HashSet<MessageFlowConfiguration>();
-// ofconfigs2.add(ofconfig2);
-//
-// service1.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs2);
-//
-// service4.start();
-// service3.start();
-// service2.start();
-// service1.start();
-// service0.start();
-//
-// log.info("started service");
-//
-// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
-// service0Params);
-//
-// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
-// ClientSession session0 = csf0.createSession(false, true, true);
-//
-// ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
-// ClientSession session2 = csf2.createSession(false, true, true);
-// session2.createQueue(testAddress, testAddress, null, false, true);
-//
-// ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
-// ClientSession session3 = csf3.createSession(false, true, true);
-// session3.createQueue(testAddress, testAddress, null, false, true);
-//
-// ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
-// ClientSession session4 = csf4.createSession(false, true, true);
-// session4.createQueue(testAddress, testAddress, null, false, true);
-//
-// ClientProducer prod0 = session0.createProducer(testAddress);
-//
-// ClientConsumer cons2 = session2.createConsumer(testAddress);
-// ClientConsumer cons3 = session3.createConsumer(testAddress);
-// ClientConsumer cons4 = session4.createConsumer(testAddress);
-//
-// session2.start();
-// session3.start();
-// session4.start();
-//
-// final int numMessages = 1;
-//
-// final SimpleString propKey = new SimpleString("testkey");
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage message = session0.createClientMessage(true);
-// message.putIntProperty(propKey, i);
-// message.getBody().flip();
-//
-// prod0.send(message);
-// }
-//
-// log.info("sent messages");
-//
-// if (shouldReceive)
-// {
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage rmessage = cons2.receive(5000);
-// assertNotNull(rmessage);
-// assertEquals(i, rmessage.getProperty(propKey));
-//
-// rmessage = cons3.receive(5000);
-// assertNotNull(rmessage);
-// assertEquals(i, rmessage.getProperty(propKey));
-//
-// rmessage = cons4.receive(5000);
-// assertNotNull(rmessage);
-// assertEquals(i, rmessage.getProperty(propKey));
-// }
-// }
-//
-// ClientMessage rmessage = cons2.receive(1000);
-// rmessage = cons3.receive(1000);
-// rmessage = cons4.receive(1000);
-//
-// assertNull(rmessage);
-//
-// service0.stop();
-// service1.stop();
-// service2.stop();
-// service3.stop();
-// service4.stop();
-//
-// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
-// assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
-// }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.clearData();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java (from rev 5666, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/DiscoveryTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-01-21 13:29:08 UTC (rev 5673)
@@ -0,0 +1,811 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.discovery;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.cluster.BroadcastGroup;
+import org.jboss.messaging.core.server.cluster.impl.BroadcastGroupImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 8 Dec 2008 12:36:26
+ *
+ *
+ *
+ *
+ */
+public class DiscoveryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+ public void testSimpleBroadcast() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertTrue(ok);
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+ assertNotNull(connectors);
+
+ assertEquals(1, connectors.size());
+
+ Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+
+ assertEquals(connectorPair, receivedPair);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testSimpleBroadcastDifferentAddress() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertFalse(ok);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testSimpleBroadcastDifferentPort() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ final int port2 = 6746;
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, port2, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertFalse(ok);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testSimpleBroadcastDifferentAddressAndPort() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
+ final int port2 = 6746;
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress2, port2, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertFalse(ok);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleGroups() throws Exception
+ {
+ final InetAddress groupAddress1 = InetAddress.getByName("230.1.2.3");
+ final int groupPort1 = 6745;
+
+ final InetAddress groupAddress2 = InetAddress.getByName("230.1.2.4");
+ final int groupPort2 = 6746;
+
+ final InetAddress groupAddress3 = InetAddress.getByName("230.1.2.5");
+ final int groupPort3 = 6747;
+
+ final int timeout = 500;
+
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress1, groupPort1);
+ bg1.start();
+
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress2, groupPort2);
+ bg2.start();
+
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress3, groupPort3);
+ bg3.start();
+
+ TransportConfiguration live1 = generateTC();
+ TransportConfiguration backup1 = generateTC();
+
+ TransportConfiguration live2 = generateTC();
+ TransportConfiguration backup2 = generateTC();
+
+ TransportConfiguration live3 = generateTC();
+ TransportConfiguration backup3 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair1 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair3 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live3, backup3);
+
+ bg1.addConnectorPair(connectorPair1);
+ bg2.addConnectorPair(connectorPair2);
+ bg3.addConnectorPair(connectorPair3);
+
+ DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress1, groupPort1, timeout);
+ dg1.start();
+
+ DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress2, groupPort2, timeout);
+ dg2.start();
+
+ DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress3, groupPort3, timeout);
+ dg3.start();
+
+ bg1.broadcastConnectors();
+
+ bg2.broadcastConnectors();
+
+ bg3.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ assertTrue(ok);
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+ assertEquals(connectorPair1, receivedPair);
+
+ ok = dg2.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg2.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ receivedPair = connectors.get(0);
+ assertEquals(connectorPair2, receivedPair);
+
+ ok = dg3.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg3.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ receivedPair = connectors.get(0);
+ assertEquals(connectorPair3, receivedPair);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testBroadcastNullBackup() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, null);
+
+ bg.addConnectorPair(connectorPair);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertTrue(ok);
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+ assertNotNull(connectors);
+
+ assertEquals(1, connectors.size());
+
+ Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+
+ assertEquals(connectorPair, receivedPair);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testDiscoveryListenersCalled() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, null);
+
+ bg.addConnectorPair(connectorPair);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ MyListener listener1 = new MyListener();
+ MyListener listener2 = new MyListener();
+ MyListener listener3 = new MyListener();
+
+ dg.registerListener(listener1);
+ dg.registerListener(listener2);
+ dg.registerListener(listener3);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+
+ //Won't be called since connectors haven't changed
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ assertFalse(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ TransportConfiguration live2 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live2, null);
+
+ bg.addConnectorPair(connectorPair2);
+
+ dg.unregisterListener(listener1);
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+
+ assertFalse(listener1.called);
+ assertTrue(listener2.called);
+ assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ dg.unregisterListener(listener2);
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ assertFalse(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ TransportConfiguration live4 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair4 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live4, null);
+
+ bg.addConnectorPair(connectorPair4);
+
+ dg.unregisterListener(listener3);
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ assertFalse(listener3.called);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg1 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ bg1.start();
+
+ BroadcastGroup bg2 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ bg2.start();
+
+ BroadcastGroup bg3 = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+ bg3.start();
+
+ TransportConfiguration live1 = generateTC();
+ TransportConfiguration backup1 = generateTC();
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair1 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+ bg1.addConnectorPair(connectorPair1);
+
+ TransportConfiguration live2 = generateTC();
+ TransportConfiguration backup2 = generateTC();
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair2 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
+ bg2.addConnectorPair(connectorPair2);
+
+ TransportConfiguration live3 = generateTC();
+ TransportConfiguration backup3 = generateTC();
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair3 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live3, backup3);
+ bg3.addConnectorPair(connectorPair3);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ MyListener listener1 = new MyListener();
+ dg.registerListener(listener1);
+ MyListener listener2 = new MyListener();
+ dg.registerListener(listener2);
+
+ dg.start();
+
+ bg1.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(2, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(3, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(3, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(3, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(3, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ TransportConfiguration live1_1 = generateTC();
+ TransportConfiguration backup1_1 = generateTC();
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair1_1 =
+ new Pair<TransportConfiguration, TransportConfiguration>(live1_1, backup1_1);
+ bg1.addConnectorPair(connectorPair1_1);
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(4, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertTrue(connectors.contains(connectorPair1_1));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.removeConnectorPair(connectorPair2);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(4, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ //Connector2 should still be there since not timed out yet
+ assertTrue(connectors.contains(connectorPair2));
+ assertTrue(connectors.contains(connectorPair3));
+ assertTrue(connectors.contains(connectorPair1_1));
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(3, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair3));
+ assertTrue(connectors.contains(connectorPair1_1));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnectorPair(connectorPair1);
+ bg3.removeConnectorPair(connectorPair3);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ assertTrue(connectors.contains(connectorPair1_1));
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnectorPair(connectorPair1_1);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(0, connectors.size());
+ assertTrue(listener1.called);
+ assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ connectors = dg.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(0, connectors.size());
+ assertFalse(listener1.called);
+ assertFalse(listener2.called);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleDiscoveryGroups() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair1=
+ new Pair<TransportConfiguration, TransportConfiguration>(live1, backup1);
+
+ bg.addConnectorPair(connectorPair1);
+
+ DiscoveryGroup dg1 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ DiscoveryGroup dg2 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ DiscoveryGroup dg3 = new DiscoveryGroupImpl(randomString(), groupAddress, groupPort, timeout);
+
+ dg1.start();
+ dg2.start();
+ dg3.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ assertTrue(ok);
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg1.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+
+ ok = dg2.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg2.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ ok = dg3.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg3.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(1, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+
+ TransportConfiguration live2 = generateTC();
+ TransportConfiguration backup2 = generateTC();
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair2=
+ new Pair<TransportConfiguration, TransportConfiguration>(live2, backup2);
+
+ bg.addConnectorPair(connectorPair2);
+
+ bg.broadcastConnectors();
+ ok = dg1.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg1.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(2, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+
+ ok = dg2.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg2.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(2, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+
+ ok = dg3.waitForBroadcast(1000);
+ assertTrue(ok);
+ connectors = dg3.getConnectors();
+ assertNotNull(connectors);
+ assertEquals(2, connectors.size());
+ assertTrue(connectors.contains(connectorPair1));
+ assertTrue(connectors.contains(connectorPair2));
+
+ bg.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ private TransportConfiguration generateTC()
+ {
+ String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 721633.123d);
+ TransportConfiguration tc = new TransportConfiguration(className, params, name);
+ return tc;
+ }
+
+ private static class MyListener implements DiscoveryListener
+ {
+ volatile boolean called;
+
+ public void connectorsChanged()
+ {
+ called = true;
+ }
+ }
+
+}
More information about the jboss-cvs-commits
mailing list