[hornetq-commits] JBoss hornetq SVN: r11883 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 8 13:22:55 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-08 13:22:55 -0500 (Thu, 08 Dec 2011)
New Revision: 11883

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:


Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java	2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java	2011-12-08 18:22:55 UTC (rev 11883)
@@ -68,6 +68,10 @@
    private final long maxRetryInterval;
 
    private final int minLargeMessageSize;
+   
+   // At this point this is only changed on testcases
+   // The bridge shouldn't be sending blocking anyways
+   private long callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
 
    /**
     *  For backward compatibility on the API... no MinLargeMessage on this constructor
@@ -445,4 +449,26 @@
    {
       this.password = password;
    }
+   
+
+   /**
+    * @return the callTimeout
+    */
+   public long getCallTimeout()
+   {
+      return callTimeout;
+   }
+
+   /**
+    * 
+    * At this point this is only changed on testcases
+    * The bridge shouldn't be sending blocking anyways
+    * @param callTimeout the callTimeout to set
+    */
+   public void setCallTimeout(long callTimeout)
+   {
+      this.callTimeout = callTimeout;
+   }
+
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-12-08 18:22:55 UTC (rev 11883)
@@ -572,11 +572,19 @@
          {
             producer.send(dest, message);
          }
-         catch (HornetQException e)
+         catch (final HornetQException e)
          {
             log.warn("Unable to send message " + ref + ", will try again once bridge reconnects", e);
 
             refs.remove(ref);
+            
+            executor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  connectionFailed(e, false);
+               }
+            });
 
             return HandleStatus.BUSY;
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-12-08 18:22:55 UTC (rev 11883)
@@ -471,6 +471,12 @@
       serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+
+      // This will be set to 30s unless it's changed from embedded / testing
+      // there is no reason to exception the config for this timeout 
+      // since the Bridge is supposed to be non-blocking and fast
+      // We may expose this if we find a good use case
+      serverLocator.setCallTimeout(config.getCallTimeout());
       if (!config.isUseDuplicateDetection())
       {
          log.debug("Bridge " + config.getName() +

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-12-08 18:16:25 UTC (rev 11882)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java	2011-12-08 18:22:55 UTC (rev 11883)
@@ -17,6 +17,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,8 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -39,6 +42,9 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
@@ -46,6 +52,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.cluster.impl.BridgeImpl;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
@@ -263,6 +270,220 @@
 
    }
 
+
+   public void testLostMessageSimpleMessage() throws Exception
+   {
+      internalTestMessageLoss(false);
+   }
+   
+   public void testLostMessageLargeMessage() throws Exception
+   {
+      internalTestMessageLoss(true);
+   }
+   
+   /** This test will ignore messages
+       What will cause the bridge to fail with a timeout
+       The bridge should still recover the failure and reconnect on that case */
+   public void internalTestMessageLoss(final boolean largeMessage) throws Exception
+   {
+      class MyInterceptor implements Interceptor
+      {
+         public boolean ignoreSends = true;
+         public CountDownLatch latch;
+         
+         MyInterceptor(int numberOfIgnores)
+         {
+            latch = new CountDownLatch(numberOfIgnores);
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet, org.hornetq.spi.core.protocol.RemotingConnection)
+          */
+         public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+         {
+            if (ignoreSends && packet instanceof SessionSendMessage ||
+                ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage)packet).isContinues())
+            {
+               System.out.println("Ignored");
+               latch.countDown();
+               return false;
+            }
+            else
+            {
+               return true;
+            }
+         }
+         
+      }
+      
+      MyInterceptor myInterceptor = new MyInterceptor(3);
+      
+      HornetQServer server0 = null;
+      HornetQServer server1 = null;
+      ServerLocator locator = null;
+      try
+      {
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+         HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         connectors.put(server1tc.getName(), server1tc);
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         final int messageSize = 1024;
+
+         final int numMessages = 1;
+
+         ArrayList<String> connectorConfig = new ArrayList<String>();
+         connectorConfig.add(server1tc.getName());
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+                                                                           queueName0,
+                                                                           forwardAddress,
+                                                                           null,
+                                                                           null,
+                                                                           HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                                           HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                           HornetQClient.DEFAULT_CONNECTION_TTL,
+                                                                           1000,
+                                                                           HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+                                                                           1d,
+                                                                           -1,
+                                                                           false,
+                                                                           // Choose confirmation size to make sure acks
+                                                                           // are sent
+                                                                           numMessages * messageSize / 2,
+                                                                           connectorConfig,
+                                                                           false,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                           ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+         
+         bridgeConfiguration.setCallTimeout(500);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         
+         server1.getRemotingService().addInterceptor(myInterceptor);
+
+         server0.start();
+         locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         final byte[] bytes = new byte[messageSize];
+
+         final SimpleString propKey = new SimpleString("testkey");
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = session0.createMessage(true);
+
+            if (largeMessage)
+            {
+               message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
+            }
+
+            message.putIntProperty(propKey, i);
+
+            message.getBodyBuffer().writeBytes(bytes);
+
+            producer0.send(message);
+         }
+         
+         myInterceptor.latch.await();
+         myInterceptor.ignoreSends = false;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage message = consumer1.receive(30000);
+
+            Assert.assertNotNull(message);
+
+            Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+            if (largeMessage)
+            {
+               readMessages(message);
+            }
+
+            message.acknowledge();
+         }
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+      
+      assertEquals(0, loadQueues(server0).size());
+
+   }
+
    /**
     * @param server1Params
     */



More information about the hornetq-commits mailing list