[hornetq-commits] JBoss hornetq SVN: r8514 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 02:22:05 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-03 02:22:04 -0500 (Thu, 03 Dec 2009)
New Revision: 8514

Modified:
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
Adding test I used to capture multi-thread issue with ClientSessionFactory

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-12-03 07:06:15 UTC (rev 8513)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-12-03 07:22:04 UTC (rev 8514)
@@ -13,26 +13,25 @@
 
 package org.hornetq.tests.integration.cluster.reattach;
 
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnector;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -45,7 +44,7 @@
  *
  *
  */
-public class ReattachTest extends UnitTestCase
+public class ReattachTest extends ServiceTestBase
 {
    private static final Logger log = Logger.getLogger(ReattachTest.class);
 
@@ -74,7 +73,7 @@
 
       final int reconnectAttempts = 1;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -152,7 +151,7 @@
 
       final int reconnectAttempts = -1;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -242,7 +241,7 @@
 
       final long asyncFailDelay = 2000;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -261,9 +260,9 @@
          {
             failed = true;
          }
-         
+
          public void beforeReconnect(HornetQException exception)
-         {               
+         {
          }
       }
 
@@ -356,7 +355,7 @@
 
       final int reconnectAttempts = 3;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -411,12 +410,12 @@
 
       conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
 
-      //Should throw exception since didn't reconnect
-      
+      // Should throw exception since didn't reconnect
+
       try
       {
          session.start();
-         
+
          fail("Should throw exception");
       }
       catch (HornetQException e)
@@ -431,6 +430,299 @@
       t.join();
    }
 
+   public void testCreateSessionFailAfterSendSeveralThreads() throws Throwable
+   {
+
+      Timer timer = new Timer();
+      ClientSession session = null;
+      
+      try
+      {
+
+         final long retryInterval = 50;
+
+         final double retryMultiplier = 1d;
+
+         final int reconnectAttempts = -1;
+
+         final ClientSessionFactoryInternal sf = createFactory(false);
+         
+
+
+         sf.setRetryInterval(retryInterval);
+         sf.setRetryIntervalMultiplier(retryMultiplier);
+         sf.setReconnectAttempts(reconnectAttempts);
+         sf.setConfirmationWindowSize(1024 * 1024);
+         
+         
+         session = sf.createSession();
+
+         final RemotingConnection connFailure = ((ClientSessionInternal)session).getConnection();
+         
+
+
+         int numberOfThreads = 100;
+         final int numberOfSessionsToCreate = 10;
+
+         final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+         final CountDownLatch startFlag = new CountDownLatch(1);
+
+         class CreateSessionThread extends Thread
+         {
+            Throwable failure;
+
+            public void run()
+            {
+               try
+               {
+                  alignLatch.countDown();
+                  startFlag.await();
+                  for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+                  {
+                     Thread.yield();
+                     ClientSession session = sf.createSession(false, true, true);
+   
+                     session.close();
+                  }
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  this.failure = e;
+               }
+            }
+         }
+
+         CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+         for (int i = 0; i < numberOfThreads; i++)
+         {
+            threads[i] = new CreateSessionThread();
+            threads[i].start();
+         }
+
+         // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+         alignLatch.await();
+
+         timer.schedule(new TimerTask()
+         {
+            @Override
+            public void run()
+            {
+               try
+               {
+                  connFailure.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+               }
+               catch (Exception e)
+               {
+                  log.warn("Error on the timer " + e);
+               }
+            }
+            
+         }, 10, 10);
+         
+         startFlag.countDown();
+
+         Throwable failure = null;
+
+         for (CreateSessionThread thread : threads)
+         {
+            thread.join();
+            if (thread.failure != null)
+            {
+               System.out.println("Thread " + thread.getName() + " failed - " + thread.failure);
+               failure = thread.failure;
+            }
+         }
+
+         if (failure != null)
+         {
+            throw failure;
+         }
+
+         sf.close();
+
+      }
+      finally
+      {
+         timer.cancel();
+         
+         if (session != null)
+         {
+            session.close();
+         }
+      }
+   }
+
+   public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable
+   {
+      final long retryInterval = 500;
+
+      final double retryMultiplier = 1d;
+
+      final int reconnectAttempts = -1;
+
+      final ClientSessionFactoryInternal sf = createFactory(false);
+
+      sf.setRetryInterval(retryInterval);
+      sf.setRetryIntervalMultiplier(retryMultiplier);
+      sf.setReconnectAttempts(reconnectAttempts);
+      sf.setConfirmationWindowSize(1024 * 1024);
+
+      InVMConnector.failOnCreateConnection = true;
+
+      int numberOfThreads = 100;
+
+      final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+      final CountDownLatch startFlag = new CountDownLatch(1);
+
+      class CreateSessionThread extends Thread
+      {
+         Throwable failure;
+
+         public void run()
+         {
+            try
+            {
+               alignLatch.countDown();
+               startFlag.await();
+               ClientSession session = sf.createSession(false, true, true);
+
+               session.close();
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               this.failure = e;
+            }
+         }
+      }
+
+      CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         threads[i] = new CreateSessionThread();
+         threads[i].start();
+      }
+
+      // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(retryInterval * 3);
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+
+            InVMConnector.failOnCreateConnection = false;
+         }
+      };
+
+      alignLatch.await();
+
+      t.start();
+
+      startFlag.countDown();
+
+      Throwable failure = null;
+
+      for (CreateSessionThread thread : threads)
+      {
+         thread.join();
+         if (thread.failure != null)
+         {
+            System.out.println("Thread " + thread.getName() + " failed - " + thread.failure);
+            failure = thread.failure;
+         }
+      }
+
+      if (failure != null)
+      {
+         throw failure;
+      }
+
+      sf.close();
+
+      t.join();
+   }
+
+   public void testCreateQueue() throws Exception
+   {
+      final long retryInterval = 50;
+
+      final double retryMultiplier = 1d;
+
+      final int reconnectAttempts = -1;
+
+      ClientSessionFactoryInternal sf = createFactory(false);
+
+      sf.setRetryInterval(retryInterval);
+      sf.setRetryIntervalMultiplier(retryMultiplier);
+      sf.setReconnectAttempts(reconnectAttempts);
+      sf.setConfirmationWindowSize(1024 * 1024);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+      InVMConnector.failOnCreateConnection = false;
+
+      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(retryInterval * 3);
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+
+            InVMConnector.failOnCreateConnection = false;
+         }
+      };
+
+      t.start();
+
+      for (int i = 0; i < 10; i++)
+      {
+         session.createQueue("address", "queue" + i);
+      }
+
+      //
+      // InVMConnector.failOnCreateConnection = true;
+
+      //
+      // //Should throw exception since didn't reconnect
+      //      
+      // try
+      // {
+      // session.start();
+      //         
+      // fail("Should throw exception");
+      // }
+      // catch (HornetQException e)
+      // {
+      // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+      // }
+
+      session.close();
+
+      sf.close();
+
+      t.join();
+   }
+
    public void testReattachAttemptsSucceedsInReconnecting() throws Exception
    {
       final long retryInterval = 500;
@@ -439,13 +731,13 @@
 
       final int reconnectAttempts = 10;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
       sf.setReconnectAttempts(reconnectAttempts);
       sf.setConfirmationWindowSize(1024 * 1024);
-      
+
       ClientSession session = sf.createSession(false, true, true);
 
       session.createQueue(ADDRESS, ADDRESS, null, false);
@@ -507,7 +799,7 @@
 
       final int reconnectAttempts = -1;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -599,7 +891,7 @@
 
       final int reconnectAttempts = -1;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -677,7 +969,7 @@
 
       final long maxRetryInterval = 1000;
 
-      ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactoryInternal sf = createFactory(false);
 
       sf.setRetryInterval(retryInterval);
       sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -757,11 +1049,8 @@
    {
       super.setUp();
 
-      Configuration liveConf = new ConfigurationImpl();
-      liveConf.setSecurityEnabled(false);
-      liveConf.getAcceptorConfigurations()
-              .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
-      service = HornetQ.newHornetQServer(liveConf, false);
+      service = createServer(false, false);
+
       service.start();
    }
 



More information about the hornetq-commits mailing list