Author: jmesnil
Date: 2009-11-09 10:26:12 -0500 (Mon, 09 Nov 2009)
New Revision: 8253
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
Log:
rewrite ManualReconnectinToSingleServerTest
* check manual reconnection for consumer only
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-11-09
09:47:36 UTC (rev 8252)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-11-09
15:26:12 UTC (rev 8253)
@@ -13,33 +13,13 @@
package org.hornetq.tests.integration.jms;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-import static
org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
-import static org.hornetq.tests.util.RandomUtil.randomString;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -58,10 +38,15 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.Pair;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -74,34 +59,22 @@
private Connection connection;
- private Session session;
-
- private MessageProducer producer;
-
private MessageConsumer consumer;
private CountDownLatch exceptionLatch = new CountDownLatch(1);
private CountDownLatch reconnectionLatch = new CountDownLatch(1);
- private volatile boolean afterRestart = false;
+ private CountDownLatch allMessagesReceived = new CountDownLatch(1);
- private volatile boolean receivedMessagesAfterRestart = false;
-
- private int callTimeout;
+ private JMSServerManager serverManager;
- private MessageListener listener = new MessageListener()
- {
- public void onMessage(Message msg)
- {
- if (afterRestart)
- {
- receivedMessagesAfterRestart = true;
- }
- log.info(receivedMessagesAfterRestart + " " + msg);
- }
- };
+ private InVMContext context;
+ private final String queueName =
"ManualReconnectionToSingleServerTest.queue";
+
+ private final int num = 20;
+
private ExceptionListener exceptionListener = new ExceptionListener()
{
public void onException(JMSException e)
@@ -113,14 +86,8 @@
}
};
- private HornetQServer server;
+ private Listener listener;
- private JMSServerManagerImpl serverManager;
-
- private InVMContext context;
-
- private final String queueName = randomString();
-
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -133,48 +100,48 @@
public void testExceptionListener() throws Exception
{
- long start = System.currentTimeMillis();
-
connect();
- int num = 20;
+ ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
+ Destination dest = (Destination)context.lookup(queueName);
+ Connection conn = cf.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sess.createProducer(dest);
+
for (int i = 0; i < num; i++)
{
- try
- {
- Message message = session.createTextMessage((new Date()).toString());
- producer.send(message);
- Thread.sleep(500);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ Message message = sess.createTextMessage((new Date()).toString());
+ message.setIntProperty("counter", i + 1);
+ prod.send(message);
if (i == num / 2)
{
- killServer();
+ conn.close();
+ serverManager.stop();
Thread.sleep(5000);
- startServer();
- afterRestart = true;
+ serverManager.start();
+ cf = (ConnectionFactory)context.lookup("/cf");
+ dest = (Destination)context.lookup(queueName);
+ conn = cf.createConnection();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ prod = sess.createProducer(dest);
}
}
+ conn.close();
+
boolean gotException = exceptionLatch.await(10, SECONDS);
assertTrue(gotException);
boolean clientReconnected = reconnectionLatch.await(10, SECONDS);
+
assertTrue("client did not reconnect after server was restarted",
clientReconnected);
- assertTrue(receivedMessagesAfterRestart);
+ boolean gotAllMessages = allMessagesReceived.await(10, SECONDS);
+ assertTrue(gotAllMessages);
+
connection.close();
-
- long end = System.currentTimeMillis();
-
- log.info("That took " + (end - start));
-
- //Make sure it doesn't pass by just timing out on blocking send
- assertTrue(end - start < callTimeout);
+
}
// Package protected ---------------------------------------------
@@ -190,26 +157,42 @@
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
conf.getAcceptorConfigurations().add(new
TransportConfiguration(NettyAcceptorFactory.class.getName()));
- server = HornetQ.newHornetQServer(conf, false);
- server.start();
+ HornetQServer server = HornetQ.newHornetQServer(conf, false);
- serverManager = new JMSServerManagerImpl(server);
- startServer();
+ JMSConfiguration configuration = new JMSConfigurationImpl();
+ context = new InVMContext();
+ configuration.setContext(context);
+ configuration.getQueueConfigurations().add(new QueueConfigurationImpl(queueName,
null, false, queueName));
+
+ ConnectionFactoryConfiguration cfConfig = new
ConnectionFactoryConfigurationImpl("cf",
+
new TransportConfiguration(NettyConnectorFactory.class.getName()),
+
"/cf");
+ cfConfig.setRetryInterval(1000);
+ cfConfig.setRetryIntervalMultiplier(1.0);
+ cfConfig.setReconnectAttempts(-1);
+ cfConfig.setFailoverOnServerShutdown(true);
+ configuration.getConnectionFactoryConfigurations().add(cfConfig);
+ serverManager = new JMSServerManagerImpl(server, configuration);
+ serverManager.start();
+
+ listener = new Listener();
+
+ exceptionLatch = new CountDownLatch(1);
+ reconnectionLatch = new CountDownLatch(1);
+ allMessagesReceived = new CountDownLatch(1);
}
@Override
protected void tearDown() throws Exception
- {
- server.stop();
-
- server = null;
-
+ {
+ serverManager.stop();
+
serverManager = null;
-
+
connection = null;
super.tearDown();
-
+
System.gc();
}
@@ -217,68 +200,6 @@
// Inner classes -------------------------------------------------
- private void startServer() throws Exception
- {
- serverManager.start();
- serverManager.activated();
- context = new InVMContext();
- serverManager.setContext(context);
- serverManager.createQueue(queueName, queueName, null, false);
- registerConnectionFactory();
- }
-
- private void killServer() throws Exception
- {
- context = null;
- serverManager.stop();
- }
-
- private void registerConnectionFactory() throws Exception
- {
- int retryInterval = 1000;
- double retryIntervalMultiplier = 1.0;
- int reconnectAttempts = -1;
- boolean failoverOnServerShutdown = true;
- callTimeout = 30000;
-
- List<Pair<TransportConfiguration, TransportConfiguration>>
connectorConfigs = new ArrayList<Pair<TransportConfiguration,
TransportConfiguration>>();
- connectorConfigs.add(new Pair<TransportConfiguration,
TransportConfiguration>(new
TransportConfiguration(NettyConnectorFactory.class.getName()),
-
null));
-
- List<String> jndiBindings = new ArrayList<String>();
- jndiBindings.add("/cf");
-
-
serverManager.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
-
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- 1000,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
- }
-
protected void disconnect()
{
log.info("calling disconnect");
@@ -294,6 +215,7 @@
log.info("closing the connection");
connection.close();
connection = null;
+ log.info("connection closed");
}
catch (Exception e)
{
@@ -314,7 +236,7 @@
Queue queue;
ConnectionFactory cf;
while (true)
- {
+ {
try
{
queue = (Queue)initialContext.lookup(queueName);
@@ -323,19 +245,16 @@
}
catch (Exception e)
{
- //retry until server is up
+ // retry until server is up
Thread.sleep(100);
}
}
connection = cf.createConnection();
connection.setExceptionListener(exceptionListener);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(queue);
- System.out.println("creating consumer");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(queue);
consumer.setMessageListener(listener);
connection.start();
- System.out.println("started new connection");
}
catch (Exception e)
{
@@ -352,4 +271,28 @@
}
}
}
+
+ private class Listener implements MessageListener
+ {
+ private int count = 0;
+
+ public void onMessage(Message msg)
+ {
+ count++;
+
+ try
+ {
+ int counter = msg.getIntProperty("counter");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ if (count == num)
+ {
+ allMessagesReceived.countDown();
+ }
+ }
+ };
+
}