Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 02:22:04 -0500 (Thu, 03 Dec 2009)
New Revision: 8514
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
Adding test I used to capture multi-thread issue with ClientSessionFactory
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03
07:06:15 UTC (rev 8513)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03
07:22:04 UTC (rev 8514)
@@ -13,26 +13,25 @@
package org.hornetq.tests.integration.cluster.reattach;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
/**
@@ -45,7 +44,7 @@
*
*
*/
-public class ReattachTest extends UnitTestCase
+public class ReattachTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ReattachTest.class);
@@ -74,7 +73,7 @@
final int reconnectAttempts = 1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -152,7 +151,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -242,7 +241,7 @@
final long asyncFailDelay = 2000;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -261,9 +260,9 @@
{
failed = true;
}
-
+
public void beforeReconnect(HornetQException exception)
- {
+ {
}
}
@@ -356,7 +355,7 @@
final int reconnectAttempts = 3;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -411,12 +410,12 @@
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- //Should throw exception since didn't reconnect
-
+ // Should throw exception since didn't reconnect
+
try
{
session.start();
-
+
fail("Should throw exception");
}
catch (HornetQException e)
@@ -431,6 +430,299 @@
t.join();
}
+ public void testCreateSessionFailAfterSendSeveralThreads() throws Throwable
+ {
+
+ Timer timer = new Timer();
+ ClientSession session = null;
+
+ try
+ {
+
+ final long retryInterval = 50;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ final ClientSessionFactoryInternal sf = createFactory(false);
+
+
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+
+ session = sf.createSession();
+
+ final RemotingConnection connFailure =
((ClientSessionInternal)session).getConnection();
+
+
+
+ int numberOfThreads = 100;
+ final int numberOfSessionsToCreate = 10;
+
+ final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+ final CountDownLatch startFlag = new CountDownLatch(1);
+
+ class CreateSessionThread extends Thread
+ {
+ Throwable failure;
+
+ public void run()
+ {
+ try
+ {
+ alignLatch.countDown();
+ startFlag.await();
+ for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+ {
+ Thread.yield();
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.failure = e;
+ }
+ }
+ }
+
+ CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new CreateSessionThread();
+ threads[i].start();
+ }
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ alignLatch.await();
+
+ timer.schedule(new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ connFailure.fail(new
HornetQException(HornetQException.NOT_CONNECTED));
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on the timer " + e);
+ }
+ }
+
+ }, 10, 10);
+
+ startFlag.countDown();
+
+ Throwable failure = null;
+
+ for (CreateSessionThread thread : threads)
+ {
+ thread.join();
+ if (thread.failure != null)
+ {
+ System.out.println("Thread " + thread.getName() + " failed
- " + thread.failure);
+ failure = thread.failure;
+ }
+ }
+
+ if (failure != null)
+ {
+ throw failure;
+ }
+
+ sf.close();
+
+ }
+ finally
+ {
+ timer.cancel();
+
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+
+ public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ final ClientSessionFactoryInternal sf = createFactory(false);
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+ InVMConnector.failOnCreateConnection = true;
+
+ int numberOfThreads = 100;
+
+ final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+ final CountDownLatch startFlag = new CountDownLatch(1);
+
+ class CreateSessionThread extends Thread
+ {
+ Throwable failure;
+
+ public void run()
+ {
+ try
+ {
+ alignLatch.countDown();
+ startFlag.await();
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.failure = e;
+ }
+ }
+ }
+
+ CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new CreateSessionThread();
+ threads[i].start();
+ }
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(retryInterval * 3);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ InVMConnector.failOnCreateConnection = false;
+ }
+ };
+
+ alignLatch.await();
+
+ t.start();
+
+ startFlag.countDown();
+
+ Throwable failure = null;
+
+ for (CreateSessionThread thread : threads)
+ {
+ thread.join();
+ if (thread.failure != null)
+ {
+ System.out.println("Thread " + thread.getName() + " failed -
" + thread.failure);
+ failure = thread.failure;
+ }
+ }
+
+ if (failure != null)
+ {
+ throw failure;
+ }
+
+ sf.close();
+
+ t.join();
+ }
+
+ public void testCreateQueue() throws Exception
+ {
+ final long retryInterval = 50;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ ClientSessionFactoryInternal sf = createFactory(false);
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ InVMConnector.failOnCreateConnection = false;
+
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(retryInterval * 3);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ InVMConnector.failOnCreateConnection = false;
+ }
+ };
+
+ t.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ session.createQueue("address", "queue" + i);
+ }
+
+ //
+ // InVMConnector.failOnCreateConnection = true;
+
+ //
+ // //Should throw exception since didn't reconnect
+ //
+ // try
+ // {
+ // session.start();
+ //
+ // fail("Should throw exception");
+ // }
+ // catch (HornetQException e)
+ // {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ // }
+
+ session.close();
+
+ sf.close();
+
+ t.join();
+ }
+
public void testReattachAttemptsSucceedsInReconnecting() throws Exception
{
final long retryInterval = 500;
@@ -439,13 +731,13 @@
final int reconnectAttempts = 10;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setConfirmationWindowSize(1024 * 1024);
-
+
ClientSession session = sf.createSession(false, true, true);
session.createQueue(ADDRESS, ADDRESS, null, false);
@@ -507,7 +799,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -599,7 +891,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -677,7 +969,7 @@
final long maxRetryInterval = 1000;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -757,11 +1049,8 @@
{
super.setUp();
- Configuration liveConf = new ConfigurationImpl();
- liveConf.setSecurityEnabled(false);
- liveConf.getAcceptorConfigurations()
- .add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
- service = HornetQ.newHornetQServer(liveConf, false);
+ service = createServer(false, false);
+
service.start();
}