[hornetq-commits] JBoss hornetq SVN: r8253 - trunk/tests/src/org/hornetq/tests/integration/jms.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 9 10:26:13 EST 2009


Author: jmesnil
Date: 2009-11-09 10:26:12 -0500 (Mon, 09 Nov 2009)
New Revision: 8253

Modified:
   trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
Log:
rewrite ManualReconnectinToSingleServerTest

* check manual reconnection for consumer only

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java	2009-11-09 09:47:36 UTC (rev 8252)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java	2009-11-09 15:26:12 UTC (rev 8253)
@@ -13,33 +13,13 @@
 package org.hornetq.tests.integration.jms;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
-import static org.hornetq.tests.util.RandomUtil.randomString;
 
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -58,10 +38,15 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
 import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
 import org.hornetq.jms.server.impl.JMSServerManagerImpl;
 import org.hornetq.tests.unit.util.InVMContext;
 import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.Pair;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -74,34 +59,22 @@
 
    private Connection connection;
 
-   private Session session;
-
-   private MessageProducer producer;
-
    private MessageConsumer consumer;
 
    private CountDownLatch exceptionLatch = new CountDownLatch(1);
 
    private CountDownLatch reconnectionLatch = new CountDownLatch(1);
 
-   private volatile boolean afterRestart = false;
+   private CountDownLatch allMessagesReceived = new CountDownLatch(1);
 
-   private volatile boolean receivedMessagesAfterRestart = false;
-   
-   private int callTimeout;
+   private JMSServerManager serverManager;
 
-   private MessageListener listener = new MessageListener()
-   {
-      public void onMessage(Message msg)
-      {
-         if (afterRestart)
-         {
-            receivedMessagesAfterRestart = true;
-         }
-         log.info(receivedMessagesAfterRestart + " " + msg);
-      }
-   };
+   private InVMContext context;
 
+   private final String queueName = "ManualReconnectionToSingleServerTest.queue";
+
+   private final int num = 20;
+
    private ExceptionListener exceptionListener = new ExceptionListener()
    {
       public void onException(JMSException e)
@@ -113,14 +86,8 @@
       }
    };
 
-   private HornetQServer server;
+   private Listener listener;
 
-   private JMSServerManagerImpl serverManager;
-
-   private InVMContext context;
-
-   private final String queueName = randomString();
-
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -133,48 +100,48 @@
 
    public void testExceptionListener() throws Exception
    {
-      long start = System.currentTimeMillis();
-      
       connect();
 
-      int num = 20;
+      ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
+      Destination dest = (Destination)context.lookup(queueName);
+      Connection conn = cf.createConnection();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = sess.createProducer(dest);
+
       for (int i = 0; i < num; i++)
       {
-         try
-         {
-            Message message = session.createTextMessage((new Date()).toString());
-            producer.send(message);
-            Thread.sleep(500);
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
+         Message message = sess.createTextMessage((new Date()).toString());
+         message.setIntProperty("counter", i + 1);
+         prod.send(message);
 
          if (i == num / 2)
          {
-            killServer();
+            conn.close();
+            serverManager.stop();
             Thread.sleep(5000);
-            startServer();
-            afterRestart = true;
+            serverManager.start();
+            cf = (ConnectionFactory)context.lookup("/cf");
+            dest = (Destination)context.lookup(queueName);
+            conn = cf.createConnection();
+            sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            prod = sess.createProducer(dest);
          }
       }
 
+      conn.close();
+
       boolean gotException = exceptionLatch.await(10, SECONDS);
       assertTrue(gotException);
 
       boolean clientReconnected = reconnectionLatch.await(10, SECONDS);
+
       assertTrue("client did not reconnect after server was restarted", clientReconnected);
 
-      assertTrue(receivedMessagesAfterRestart);
+      boolean gotAllMessages = allMessagesReceived.await(10, SECONDS);
+      assertTrue(gotAllMessages);
+
       connection.close();
-      
-      long end = System.currentTimeMillis();
-      
-      log.info("That took " + (end - start));
-      
-      //Make sure it doesn't pass by just timing out on blocking send
-      assertTrue(end - start < callTimeout);      
+
    }
 
    // Package protected ---------------------------------------------
@@ -190,26 +157,42 @@
       conf.setSecurityEnabled(false);
       conf.setJMXManagementEnabled(true);
       conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
-      server = HornetQ.newHornetQServer(conf, false);
-      server.start();
+      HornetQServer server = HornetQ.newHornetQServer(conf, false);
 
-      serverManager = new JMSServerManagerImpl(server);
-      startServer();
+      JMSConfiguration configuration = new JMSConfigurationImpl();
+      context = new InVMContext();
+      configuration.setContext(context);
+      configuration.getQueueConfigurations().add(new QueueConfigurationImpl(queueName, null, false, queueName));
+
+      ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl("cf",
+                                                                                       new TransportConfiguration(NettyConnectorFactory.class.getName()),
+                                                                                       "/cf");
+      cfConfig.setRetryInterval(1000);
+      cfConfig.setRetryIntervalMultiplier(1.0);
+      cfConfig.setReconnectAttempts(-1);
+      cfConfig.setFailoverOnServerShutdown(true);
+      configuration.getConnectionFactoryConfigurations().add(cfConfig);
+      serverManager = new JMSServerManagerImpl(server, configuration);
+      serverManager.start();
+      
+      listener = new Listener();
+      
+      exceptionLatch = new CountDownLatch(1);
+      reconnectionLatch = new CountDownLatch(1);
+      allMessagesReceived = new CountDownLatch(1);
    }
 
    @Override
    protected void tearDown() throws Exception
-   {      
-      server.stop();
-      
-      server = null;
-      
+   {
+      serverManager.stop();
+
       serverManager = null;
-      
+
       connection = null;
 
       super.tearDown();
-      
+
       System.gc();
    }
 
@@ -217,68 +200,6 @@
 
    // Inner classes -------------------------------------------------
 
-   private void startServer() throws Exception
-   {
-      serverManager.start();
-      serverManager.activated();
-      context = new InVMContext();
-      serverManager.setContext(context);      
-      serverManager.createQueue(queueName, queueName, null, false);
-      registerConnectionFactory();
-   }
-
-   private void killServer() throws Exception
-   {
-      context = null;
-      serverManager.stop();
-   }
-
-   private void registerConnectionFactory() throws Exception
-   {
-      int retryInterval = 1000;
-      double retryIntervalMultiplier = 1.0;
-      int reconnectAttempts = -1;
-      boolean failoverOnServerShutdown = true;
-      callTimeout = 30000;
-
-      List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
-      connectorConfigs.add(new Pair<TransportConfiguration, TransportConfiguration>(new TransportConfiguration(NettyConnectorFactory.class.getName()),
-                                                                                    null));
-
-      List<String> jndiBindings = new ArrayList<String>();
-      jndiBindings.add("/cf");
-
-      serverManager.createConnectionFactory("ManualReconnectionToSingleServerTest",
-                                            connectorConfigs,
-                                            null,
-                                            DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-                                            DEFAULT_CONNECTION_TTL,
-                                            callTimeout,                                            
-                                            DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
-                                            DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                            DEFAULT_CONSUMER_WINDOW_SIZE,
-                                            DEFAULT_CONSUMER_MAX_RATE,
-                                            DEFAULT_CONFIRMATION_WINDOW_SIZE,
-                                            DEFAULT_PRODUCER_MAX_RATE,
-                                            DEFAULT_BLOCK_ON_ACKNOWLEDGE,
-                                            DEFAULT_BLOCK_ON_PERSISTENT_SEND,
-                                            DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
-                                            DEFAULT_AUTO_GROUP,
-                                            DEFAULT_PRE_ACKNOWLEDGE,
-                                            DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
-                                            DEFAULT_ACK_BATCH_SIZE,
-                                            DEFAULT_ACK_BATCH_SIZE,
-                                            DEFAULT_USE_GLOBAL_POOLS,
-                                            DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-                                            DEFAULT_THREAD_POOL_MAX_SIZE,                                      
-                                            retryInterval,
-                                            retryIntervalMultiplier,
-                                            1000,
-                                            reconnectAttempts,
-                                            failoverOnServerShutdown,
-                                            jndiBindings);
-   }
-
    protected void disconnect()
    {
       log.info("calling disconnect");
@@ -294,6 +215,7 @@
          log.info("closing the connection");
          connection.close();
          connection = null;
+         log.info("connection closed");
       }
       catch (Exception e)
       {
@@ -314,7 +236,7 @@
          Queue queue;
          ConnectionFactory cf;
          while (true)
-         {            
+         {
             try
             {
                queue = (Queue)initialContext.lookup(queueName);
@@ -323,19 +245,16 @@
             }
             catch (Exception e)
             {
-               //retry until server is up
+               // retry until server is up
                Thread.sleep(100);
             }
          }
          connection = cf.createConnection();
          connection.setExceptionListener(exceptionListener);
-         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         producer = session.createProducer(queue);
-         System.out.println("creating consumer");
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          consumer = session.createConsumer(queue);
          consumer.setMessageListener(listener);
          connection.start();
-         System.out.println("started new connection");
       }
       catch (Exception e)
       {
@@ -352,4 +271,28 @@
          }
       }
    }
+
+   private class Listener implements MessageListener
+   {
+      private int count = 0;
+
+      public void onMessage(Message msg)
+      {
+         count++;
+
+         try
+         {
+            int counter = msg.getIntProperty("counter");
+         }
+         catch (JMSException e)
+         {
+            e.printStackTrace();
+         }
+         if (count == num)
+         {
+            allMessagesReceived.countDown();
+         }
+      }
+   };
+
 }



More information about the hornetq-commits mailing list