[hornetq-commits] JBoss hornetq SVN: r10308 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/bridge and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Mar 9 09:23:41 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-03-09 09:23:40 -0500 (Wed, 09 Mar 2011)
New Revision: 10308
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
Verifying target destinations if the target address is a JMS queue/topic
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-09 14:23:40 UTC (rev 10308)
@@ -26,6 +26,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -65,6 +66,10 @@
private static final Logger log = Logger.getLogger(BridgeImpl.class);
// Attributes ----------------------------------------------------
+
+ private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
+
+ private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
protected final ServerLocatorInternal serverLocator;
@@ -95,6 +100,8 @@
private final boolean useDuplicateDetection;
private volatile boolean active;
+
+ private volatile boolean stopping;
private final String user;
@@ -214,6 +221,8 @@
csf.close();
}
}
+
+ stopping = true;
executor.execute(new StopRunnable());
@@ -489,6 +498,7 @@
}
boolean retry = false;
+ int retryCount = 0;
do
{
@@ -500,6 +510,41 @@
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+ if (forwardingAddress != null)
+ {
+ BindingQuery query = session.bindingQuery(forwardingAddress);
+
+ if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ if (!query.isExists())
+ {
+ retryCount ++;
+ if (serverLocator.getReconnectAttempts() > 0)
+ {
+ if (retryCount > serverLocator.getReconnectAttempts())
+ {
+ log.warn("Retried " + forwardingAddress + " up to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + this.getName() + " will not be activated");
+ return false;
+ }
+ }
+
+ log.warn("Address " + forwardingAddress + " doesn't have any bindings yet, retry #(" + retryCount + ")");
+ Thread.sleep(serverLocator.getRetryInterval());
+ retry = true;
+ csf.close();
+ session.close();
+ continue;
+ }
+ }
+ else
+ {
+ if (!query.isExists())
+ {
+ log.info("Bridge " + this.getName() + " connected to fowardingAddress=" + this.getForwardingAddress() + ". " + getForwardingAddress() + " doesn't have any bindings what means messages will be ignored until a binding is created.");
+ }
+ }
+ }
+
if (session == null)
{
// This can happen if the bridge is shutdown
@@ -560,7 +605,7 @@
return false;
}
}
- while (retry);
+ while (retry && started && !stopping);
return false;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-03-09 14:23:40 UTC (rev 10308)
@@ -108,7 +108,7 @@
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
- // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
@@ -117,7 +117,6 @@
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
-
final int messageSize = 1024;
final int numMessages = 10;
@@ -222,7 +221,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -441,7 +440,7 @@
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -465,7 +464,6 @@
}
-
// Created to verify JBPAPP-6057
public void testStartLater() throws Exception
{
@@ -484,7 +482,7 @@
final String testAddress = "testAddress";
final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
+ final String forwardAddress = "jms.queue.forwardAddress";
final String queueName1 = "forwardAddress";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
@@ -521,11 +519,6 @@
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
- List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
-
server0.start();
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
@@ -540,52 +533,62 @@
final SimpleString propKey = new SimpleString("testkey");
final SimpleString selectorKey = new SimpleString("animal");
-
- for (int starts = 0 ; starts < 5; starts++)
+
+ for (int i = 0; i < numMessages; i++)
{
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createMessage(false);
-
- message.getBodyBuffer().writeBytes(new byte[1024]);
-
- message.putIntProperty(propKey, i);
-
- message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
-
- producer0.send(message);
- }
-
- server1.start();
+ ClientMessage message = session0.createMessage(false);
- ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
-
- ClientSession session1 = sf1.createSession(false, true, true);
-
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
-
- session1.start();
-
-
- for (int i = 0 ; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(5000);
- assertNotNull(message);
- message.acknowledge();
- }
-
- session1.commit();
-
- Assert.assertNull(consumer1.receiveImmediate());
-
- session1.close();
-
- sf1.close();
-
- server1.stop();
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ message.putIntProperty(propKey, i);
+
+ message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+ producer0.send(message);
}
+ server1.start();
+
+ Thread.sleep(1000);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ try
+ {
+ session1.createQueue(forwardAddress, queueName1);
+ }
+ catch (Throwable ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ consumer1.close();
+
+ session1.deleteQueue(queueName1);
+
+ session1.close();
+
+ sf1.close();
+
+ server1.stop();
+
session0.close();
sf0.close();
@@ -593,7 +596,7 @@
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -649,7 +652,7 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(server1tc.getName());
+ staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
@@ -662,8 +665,8 @@
false,
1024,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- staticConnectors,
- false,
+ staticConnectors,
+ false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
@@ -879,7 +882,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
@@ -901,7 +904,7 @@
}
}
-
+
public void testNullForwardingAddress() throws Exception
{
HornetQServer server0 = null;
@@ -928,16 +931,18 @@
server0.getConfiguration().setConnectorConfigurations(connectors);
-
final int messageSize = 1024;
final int numMessages = 10;
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
- queueName0,
- null, // pass a null forwarding address to use messages' original address
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
+ // forwarding
+ // address to
+ // use messages'
+ // original
+ // address
null,
null,
1000,
@@ -962,7 +967,7 @@
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- // on server #1, we bind queueName1 to same address testAddress
+ // on server #1, we bind queueName1 to same address testAddress
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(testAddress, queueName1, null, true);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
@@ -1025,7 +1030,7 @@
}
finally
{
- if(locator != null)
+ if (locator != null)
{
locator.close();
}
More information about the hornetq-commits
mailing list