[hornetq-commits] JBoss hornetq SVN: r9273 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu May 27 09:35:34 EDT 2010


Author: jmesnil
Date: 2010-05-27 09:35:33 -0400 (Thu, 27 May 2010)
New Revision: 9273

Added:
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-382: Core Bridge loses messages when paging

* add (disabled) test which highlights the issue

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java	2010-05-27 13:35:33 UTC (rev 9273)
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.bridge;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.core.server.cluster.impl.BridgeImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * A BridgeWithPagingTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class BridgeWithPagingTest extends BridgeTestBase
+{
+   private static final Logger log = Logger.getLogger(BridgeWithPagingTest.class);
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   private String getConnector()
+   {
+      if (isNetty())
+      {
+         return NettyConnectorFactory.class.getName();
+      }
+      else
+      {
+         return InVMConnectorFactory.class.getName();
+      }
+   }
+
+   public void testFoo() throws Exception
+   {
+      
+   }
+   
+   // https://jira.jboss.org/browse/HORNETQ-382
+   public void _testReconnectWithPaging() throws Exception
+   {
+      final byte[] content = new byte[2048];
+      for (int i=0; i < content.length; ++i) {
+          content[i] = (byte) i;
+      }
+      
+      Map<String, Object> server0Params = new HashMap<String, Object>();
+      HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      HornetQServer server1 = createHornetQServer(1, isNetty(), server1Params);
+
+      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
+
+      connectors.put(server1tc.getName(), server1tc);
+
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+      server1.getConfiguration().setConnectorConfigurations(connectors);
+
+      final String bridgeName = "bridge1";
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+
+      final long retryInterval = 50;
+      final double retryIntervalMultiplier = 1d;
+      final int reconnectAttempts = -1;
+      final int confirmationWindowSize = 1024;
+
+      Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+                                                                        queueName0,
+                                                                        forwardAddress,
+                                                                        null,
+                                                                        null,
+                                                                        retryInterval,
+                                                                        retryIntervalMultiplier,
+                                                                        reconnectAttempts,
+                                                                        true,
+                                                                        false,
+                                                                        confirmationWindowSize,
+                                                                        HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                                                        connectorPair,
+                                                                        ConfigurationImpl.DEFAULT_CLUSTER_USER,
+                                                                        ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+      
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setRedeliveryDelay(0);
+      addressSettings.setMaxSizeBytes(10485760);
+      addressSettings.setPageSizeBytes(1048576);
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      
+      server0.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName0, null, true);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+      
+      server1.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
+      server1.start();
+      server0.start();
+
+      ClientSessionFactory csf0 = HornetQClient.createClientSessionFactory(server0tc);
+      ClientSession session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = HornetQClient.createClientSessionFactory(server1tc);
+      ClientSession session1 = csf1.createSession(false, true, true);
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons1 = session1.createConsumer(queueName0);
+
+      session1.start();
+
+      // Now we will simulate a failure of the bridge connection between server0 and server1
+      Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+      final RemotingConnection forwardingConnection = getForwardingConnection(bridge);
+      InVMConnector.failOnCreateConnection = true;
+      InVMConnector.numberOfFailures = Integer.MAX_VALUE;
+      
+      Thread t = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            System.out.println("failing...");
+            forwardingConnection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+            System.out.println("reconnected!!!");
+         }
+      };
+      t.start();
+      
+      final int numMessages = 5000;
+
+      SimpleString propKey = new SimpleString("propkey");
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session0.createMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBodyBuffer().writeBytes(content);
+         prod0.send(message);
+         System.out.println(">>>> " + i);
+      }
+      
+      InVMConnector.failOnCreateConnection = false;
+      
+      Thread.sleep(200);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         System.out.println("<<< " + i);
+         ClientMessage r1 = cons1.receive(1500);
+         Assert.assertNotNull(r1);
+         Assert.assertEquals(i, r1.getObjectProperty(propKey));
+      }
+
+      session0.close();
+      session1.close();
+
+      server0.stop();
+      server1.stop();
+
+      Assert.assertEquals(0, server0.getRemotingService().getConnections().size());
+      Assert.assertEquals(0, server1.getRemotingService().getConnections().size());
+   }
+
+   private RemotingConnection getForwardingConnection(final Bridge bridge) throws Exception
+   {
+      long start = System.currentTimeMillis();
+
+      do
+      {
+         RemotingConnection forwardingConnection = ((BridgeImpl)bridge).getForwardingConnection();
+
+         if (forwardingConnection != null)
+         {
+            return forwardingConnection;
+         }
+
+         Thread.sleep(10);
+      }
+      while (System.currentTimeMillis() - start < 50000);
+
+      throw new IllegalStateException("Failed to get forwarding connection");
+   }
+
+}



More information about the hornetq-commits mailing list