[hornetq-commits] JBoss hornetq SVN: r10308 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/bridge and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 9 09:23:41 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-09 09:23:40 -0500 (Wed, 09 Mar 2011)
New Revision: 10308

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
Verifying target destinations if the target address is a JMS queue/topic

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-09 14:23:40 UTC (rev 10308)
@@ -26,6 +26,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.SendAcknowledgementHandler;
 import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.api.core.client.ClientSession.BindingQuery;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -65,6 +66,10 @@
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
 
    // Attributes ----------------------------------------------------
+   
+   private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
+   
+   private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
 
    protected final ServerLocatorInternal serverLocator;
 
@@ -95,6 +100,8 @@
    private final boolean useDuplicateDetection;
 
    private volatile boolean active;
+   
+   private volatile boolean stopping;
 
    private final String user;
 
@@ -214,6 +221,8 @@
             csf.close();
          }
       }
+      
+      stopping = true;
 
       executor.execute(new StopRunnable());
 
@@ -489,6 +498,7 @@
       }
 
       boolean retry = false;
+      int retryCount = 0;
 
       do
       {
@@ -500,6 +510,41 @@
             // Session is pre-acknowledge
             session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
 
+            if (forwardingAddress != null)
+            {
+               BindingQuery query = session.bindingQuery(forwardingAddress);
+   
+               if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
+               {
+                  if (!query.isExists())
+                  {
+                     retryCount ++;
+                     if (serverLocator.getReconnectAttempts() > 0)
+                     {
+                        if (retryCount > serverLocator.getReconnectAttempts())
+                        {
+                           log.warn("Retried " + forwardingAddress + " up to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + this.getName() + " will not be activated");
+                           return false;
+                        }
+                     }
+   
+                     log.warn("Address " + forwardingAddress + " doesn't have any bindings yet, retry #(" + retryCount + ")");
+                     Thread.sleep(serverLocator.getRetryInterval());
+                     retry = true;
+                     csf.close();
+                     session.close();
+                     continue;
+                  }
+               }
+               else
+               {
+                  if (!query.isExists())
+                  {
+                     log.info("Bridge " + this.getName() + " connected to fowardingAddress=" + this.getForwardingAddress() + ". " + getForwardingAddress() + " doesn't have any bindings what means messages will be ignored until a binding is created.");
+                  }
+               }
+            }
+
             if (session == null)
             {
                // This can happen if the bridge is shutdown
@@ -560,7 +605,7 @@
             return false;
          }
       }
-      while (retry);
+      while (retry && started && !stopping);
 
       return false;
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-08 21:43:15 UTC (rev 10307)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-03-09 14:23:40 UTC (rev 10308)
@@ -108,7 +108,7 @@
          final String forwardAddress = "forwardAddress";
          final String queueName1 = "queue1";
 
-        // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
          TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
 
          TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
@@ -117,7 +117,6 @@
          connectors.put(server1tc.getName(), server1tc);
          server0.getConfiguration().setConnectorConfigurations(connectors);
 
-
          final int messageSize = 1024;
 
          final int numMessages = 10;
@@ -222,7 +221,7 @@
       }
       finally
       {
-         if(locator != null)
+         if (locator != null)
          {
             locator.close();
          }
@@ -441,7 +440,7 @@
 
       finally
       {
-         if(locator != null)
+         if (locator != null)
          {
             locator.close();
          }
@@ -465,7 +464,6 @@
 
    }
 
-
    // Created to verify JBPAPP-6057
    public void testStartLater() throws Exception
    {
@@ -484,7 +482,7 @@
 
          final String testAddress = "testAddress";
          final String queueName0 = "queue0";
-         final String forwardAddress = "forwardAddress";
+         final String forwardAddress = "jms.queue.forwardAddress";
          final String queueName1 = "forwardAddress";
 
          Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
@@ -521,11 +519,6 @@
          queueConfigs0.add(queueConfig0);
          server0.getConfiguration().setQueueConfigurations(queueConfigs0);
 
-         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
-         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
-         queueConfigs1.add(queueConfig1);
-         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
-
          server0.start();
 
          locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
@@ -540,52 +533,62 @@
          final SimpleString propKey = new SimpleString("testkey");
 
          final SimpleString selectorKey = new SimpleString("animal");
-         
-         for (int starts = 0 ; starts < 5; starts++)
+
+         for (int i = 0; i < numMessages; i++)
          {
-   
-            for (int i = 0; i < numMessages; i++)
-            {
-               ClientMessage message = session0.createMessage(false);
-               
-               message.getBodyBuffer().writeBytes(new byte[1024]);
-   
-               message.putIntProperty(propKey, i);
-   
-               message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
-   
-               producer0.send(message);
-            }
-            
-            server1.start();
+            ClientMessage message = session0.createMessage(false);
 
-            ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
-   
-            ClientSession session1 = sf1.createSession(false, true, true);
-   
-            ClientConsumer consumer1 = session1.createConsumer(queueName1);
-            
-            session1.start();
-            
-            
-            for (int i = 0 ; i < numMessages; i++)
-            {
-               ClientMessage message = consumer1.receive(5000);
-               assertNotNull(message);
-               message.acknowledge();
-            }
-            
-            session1.commit();
-            
-            Assert.assertNull(consumer1.receiveImmediate());
-            
-            session1.close();
-            
-            sf1.close();
-            
-            server1.stop();
+            message.getBodyBuffer().writeBytes(new byte[1024]);
+
+            message.putIntProperty(propKey, i);
+
+            message.putStringProperty(selectorKey, new SimpleString("monkey" + i));
+
+            producer0.send(message);
          }
 
+         server1.start();
+
+         Thread.sleep(1000);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         try
+         {
+            session1.createQueue(forwardAddress, queueName1);
+         }
+         catch (Throwable ignored)
+         {
+            ignored.printStackTrace();
+         }
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+         }
+
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         consumer1.close();
+
+         session1.deleteQueue(queueName1);
+
+         session1.close();
+
+         sf1.close();
+
+         server1.stop();
+
          session0.close();
 
          sf0.close();
@@ -593,7 +596,7 @@
 
       finally
       {
-         if(locator != null)
+         if (locator != null)
          {
             locator.close();
          }
@@ -649,7 +652,7 @@
       server0.getConfiguration().setConnectorConfigurations(connectors);
 
       ArrayList<String> staticConnectors = new ArrayList<String>();
-         staticConnectors.add(server1tc.getName());
+      staticConnectors.add(server1tc.getName());
 
       BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
                                                                         queueName0,
@@ -662,8 +665,8 @@
                                                                         false,
                                                                         1024,
                                                                         HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-                                                                           staticConnectors,
-                                                                           false,
+                                                                        staticConnectors,
+                                                                        false,
                                                                         ConfigurationImpl.DEFAULT_CLUSTER_USER,
                                                                         ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
 
@@ -879,7 +882,7 @@
       }
       finally
       {
-         if(locator != null)
+         if (locator != null)
          {
             locator.close();
          }
@@ -901,7 +904,7 @@
       }
 
    }
-   
+
    public void testNullForwardingAddress() throws Exception
    {
       HornetQServer server0 = null;
@@ -928,16 +931,18 @@
 
          server0.getConfiguration().setConnectorConfigurations(connectors);
 
-
          final int messageSize = 1024;
 
          final int numMessages = 10;
 
          ArrayList<String> staticConnectors = new ArrayList<String>();
          staticConnectors.add(server1tc.getName());
-         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
-                                                                           queueName0,
-                                                                           null, // pass a null forwarding address to use messages' original address
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1", queueName0, null, // pass a null
+                                                                                                        // forwarding
+                                                                                                        // address to
+                                                                                                        // use messages'
+                                                                                                        // original
+                                                                                                        // address
                                                                            null,
                                                                            null,
                                                                            1000,
@@ -962,7 +967,7 @@
          queueConfigs0.add(queueConfig0);
          server0.getConfiguration().setQueueConfigurations(queueConfigs0);
 
-         // on server #1, we bind queueName1 to same address testAddress 
+         // on server #1, we bind queueName1 to same address testAddress
          CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(testAddress, queueName1, null, true);
          List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
          queueConfigs1.add(queueConfig1);
@@ -1025,7 +1030,7 @@
       }
       finally
       {
-         if(locator != null)
+         if (locator != null)
          {
             locator.close();
          }



More information about the hornetq-commits mailing list