Author: clebert.suconic(a)jboss.com
Date: 2011-03-08 16:43:15 -0500 (Tue, 08 Mar 2011)
New Revision: 10307
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
added test to validate JBPAPP-6057
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08
15:32:17 UTC (rev 10306)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08
21:43:15 UTC (rev 10307)
@@ -20,10 +20,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -460,6 +465,158 @@
}
+
+ // Created to verify JBPAPP-6057
+ public void testStartLater() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "forwardAddress";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String,
TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+ 100,
+ 1d,
+ -1,
+ false,
+ 1024,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
staticConnectors,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString selectorKey = new SimpleString("animal");
+
+ for (int starts = 0 ; starts < 5; starts++)
+ {
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey"
+ i));
+
+ producer0.send(message);
+ }
+
+ server1.start();
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+
+ for (int i = 0 ; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+ }
+
+ session0.close();
+
+ sf0.close();
+ }
+
+ finally
+ {
+ if(locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+
+ }
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);