Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 13:22:55 -0500 (Thu, 08 Dec 2011)
New Revision: 11883
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
Log:
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-12-08
18:16:25 UTC (rev 11882)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-12-08
18:22:55 UTC (rev 11883)
@@ -68,6 +68,10 @@
private final long maxRetryInterval;
private final int minLargeMessageSize;
+
+ // At this point this is only changed on testcases
+ // The bridge shouldn't be sending blocking anyways
+ private long callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
/**
* For backward compatibility on the API... no MinLargeMessage on this constructor
@@ -445,4 +449,26 @@
{
this.password = password;
}
+
+
+ /**
+ * @return the callTimeout
+ */
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ /**
+ *
+ * At this point this is only changed on testcases
+ * The bridge shouldn't be sending blocking anyways
+ * @param callTimeout the callTimeout to set
+ */
+ public void setCallTimeout(long callTimeout)
+ {
+ this.callTimeout = callTimeout;
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-12-08
18:16:25 UTC (rev 11882)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-12-08
18:22:55 UTC (rev 11883)
@@ -572,11 +572,19 @@
{
producer.send(dest, message);
}
- catch (HornetQException e)
+ catch (final HornetQException e)
{
log.warn("Unable to send message " + ref + ", will try again
once bridge reconnects", e);
refs.remove(ref);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ connectionFailed(e, false);
+ }
+ });
return HandleStatus.BUSY;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-08
18:16:25 UTC (rev 11882)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-12-08
18:22:55 UTC (rev 11883)
@@ -471,6 +471,12 @@
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+
+ // This will be set to 30s unless it's changed from embedded / testing
+ // there is no reason to exception the config for this timeout
+ // since the Bridge is supposed to be non-blocking and fast
+ // We may expose this if we find a good use case
+ serverLocator.setCallTimeout(config.getCallTimeout());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-12-08
18:16:25 UTC (rev 11882)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-12-08
18:22:55 UTC (rev 11883)
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -39,6 +42,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -46,6 +52,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -263,6 +270,220 @@
}
+
+ public void testLostMessageSimpleMessage() throws Exception
+ {
+ internalTestMessageLoss(false);
+ }
+
+ public void testLostMessageLargeMessage() throws Exception
+ {
+ internalTestMessageLoss(true);
+ }
+
+ /** This test will ignore messages
+ What will cause the bridge to fail with a timeout
+ The bridge should still recover the failure and reconnect on that case */
+ public void internalTestMessageLoss(final boolean largeMessage) throws Exception
+ {
+ class MyInterceptor implements Interceptor
+ {
+ public boolean ignoreSends = true;
+ public CountDownLatch latch;
+
+ MyInterceptor(int numberOfIgnores)
+ {
+ latch = new CountDownLatch(numberOfIgnores);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet,
org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (ignoreSends && packet instanceof SessionSendMessage ||
+ ignoreSends && packet instanceof SessionSendContinuationMessage
&& !((SessionSendContinuationMessage)packet).isContinues())
+ {
+ System.out.println("Ignored");
+ latch.countDown();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ }
+
+ MyInterceptor myInterceptor = new MyInterceptor(3);
+
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(),
server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(),
server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new
HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final int messageSize = 1024;
+
+ final int numMessages = 1;
+
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
+ queueName0,
+
forwardAddress,
+ null,
+ null,
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+
HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+
HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose
confirmation size to make sure acks
+ // are sent
+ numMessages *
messageSize / 2,
+
connectorConfig,
+ false,
+
ConfigurationImpl.DEFAULT_CLUSTER_USER,
+
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ bridgeConfiguration.setCallTimeout(500);
+
+ List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress,
queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress,
queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new
ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+
+ server1.getRemotingService().addInterceptor(myInterceptor);
+
+ server0.start();
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new
SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+
+ if (largeMessage)
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 *
1024));
+ }
+
+ message.putIntProperty(propKey, i);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ producer0.send(message);
+ }
+
+ myInterceptor.latch.await();
+ myInterceptor.ignoreSends = false;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(30000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ if (largeMessage)
+ {
+ readMessages(message);
+ }
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+
+ assertEquals(0, loadQueues(server0).size());
+
+ }
+
/**
* @param server1Params
*/