Author: ataylor
Date: 2011-04-12 05:47:15 -0400 (Tue, 12 Apr 2011)
New Revision: 10479
Added:
trunk/tests/soak-tests/
trunk/tests/soak-tests/pom.xml
trunk/tests/soak-tests/src/
trunk/tests/soak-tests/src/test/
trunk/tests/soak-tests/src/test/java/
trunk/tests/soak-tests/src/test/java/org/
trunk/tests/soak-tests/src/test/java/org/hornetq/
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/
Removed:
trunk/tests/src/org/hornetq/tests/soak/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/pom.xml
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Log:
mavenised soak tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/hornetq-tests.iml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -8,6 +8,7 @@
<sourceFolder
url="file://$MODULE_DIR$/integration-tests/src/test/java"
isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java"
isTestSource="true" />
<sourceFolder
url="file://$MODULE_DIR$/integration-tests/src/main/java"
isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java"
isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -28,5 +28,6 @@
<module>integration-tests</module>
<module>jms-tests</module>
<module>joram-tests</module>
+ <module>soak-tests</module>
</modules>
</project>
Added: trunk/tests/soak-tests/pom.xml
===================================================================
--- trunk/tests/soak-tests/pom.xml (rev 0)
+++ trunk/tests/soak-tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -0,0 +1,140 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>soak-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ soak Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>jms-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified:
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java 2011-04-12
09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified:
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java 2011-04-11 12:47:21
UTC (rev 10475)
+++
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java 2011-04-12
09:47:15 UTC (rev 10479)
@@ -27,7 +27,7 @@
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified:
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2011-04-12
09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified:
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java 2011-04-12
09:47:15 UTC (rev 10479)
@@ -13,8 +13,27 @@
package org.hornetq.tests.soak.failover;
-import org.hornetq.tests.integration.cluster.reattach.RandomReattachTest;
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* A RandomFailoverSoakTest
*
@@ -24,10 +43,1515 @@
*
*
*/
-public class RandomFailoverSoakTest extends RandomReattachTest
+public class RandomFailoverSoakTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(RandomFailoverSoakTest.class);
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 10000;
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+
+ private HornetQServer liveService;
+
+ private Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testA() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestA(sf);
+ }
+ });
+ }
+
+ public void testB() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestB(sf);
+ }
+ });
+ }
+
+ public void testC() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestC(sf);
+ }
+ });
+ }
+
+ public void testD() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestD(sf);
+ }
+ });
+ }
+
+ public void testE() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestE(sf);
+ }
+ });
+ }
+
+ public void testF() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestF(sf);
+ }
+ });
+ }
+
+ public void testG() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestG(sf);
+ }
+ });
+ }
+
+ public void testH() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestH(sf);
+ }
+ });
+ }
+
+ public void testI() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestI(sf);
+ }
+ });
+ }
+
+ public void testJ() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestJ(sf);
+ }
+ });
+ }
+
+ public void testK() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestK(sf);
+ }
+ });
+ }
+
+ public void testL() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestL(sf);
+ }
+ });
+ }
+
+ public void testN() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestN(sf);
+ }
+ });
+ }
+
+ public void runTest(final RunnableT runnable) throws Exception
+ {
+ final int numIts = getNumIterations();
+
+ for (int its = 0; its < numIts; its++)
+ {
+ RandomFailoverSoakTest.log.info("####" + getName() + " iteration
#" + its);
+ start();
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)
locator.createSessionFactory();
+
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Failer failer = startFailer(1000, session);
+
+ do
+ {
+ runnable.run(sf);
+ }
+ while (!failer.isExecuted());
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void doTestA(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ RandomFailoverSoakTest.log.error("Failed to process", me);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue("Didn't receive all messages", ok);
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 50;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 1;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, expected " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ if (msg == null)
+ {
+ throw new IllegalStateException("Failed to receive message " +
i);
+ }
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ Assert.assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS,
RandomFailoverSoakTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS,
RandomFailoverSoakTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null,
false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ consumer.close();
+ }
+
+ s.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ s.close();
+ }
+
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 10;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS,
+ new
SimpleString(RandomFailoverSoakTest.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new
SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new
SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
@Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ timer = new Timer(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+
+ InVMRegistry.instance.clear();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session)
+ {
+ Failer failer = new Failer((ClientSessionInternal)session);
+
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+
+ return failer;
+ }
+
+ private void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveService = HornetQServers.newHornetQServer(liveConf, false);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ liveService.stop();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+
+ liveService = null;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class Failer extends TimerTask
+ {
+ private final ClientSessionInternal session;
+
+ private boolean executed;
+
+ public Failer(final ClientSessionInternal session)
+ {
+ this.session = session;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ RandomFailoverSoakTest.log.info("** Failing connection");
+
+ session.getConnection().fail(new
HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+
+ RandomFailoverSoakTest.log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+ public abstract class RunnableT
+ {
+ abstract void run(final ClientSessionFactory sf) throws Exception;
+ }
+
+ static abstract class AssertionCheckMessageHandler implements MessageHandler
+ {
+
+
+ public void checkAssertions()
+ {
+ for (AssertionFailedError e: errors)
+ {
+ // it will throw the first error
+ throw e;
+ }
+ }
+
+ private ArrayList<AssertionFailedError> errors = new
ArrayList<AssertionFailedError>();
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+ */
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ onMessageAssert(message);
+ }
+ catch (AssertionFailedError e)
+ {
+ e.printStackTrace(); // System.out -> junit reports
+ errors.add(e);
+ }
+ }
+
+ public abstract void onMessageAssert(ClientMessage message);
+
+ }
+
protected int getNumIterations()
{
return 500;
Modified:
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-04-12
09:47:15 UTC (rev 10479)
@@ -13,9 +13,27 @@
package org.hornetq.tests.soak.journal;
-import java.util.concurrent.TimeUnit;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.hornetq.tests.stress.journal.JournalCleanupCompactStressTest;
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.*;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
/**
* A JournalCleanupCompactSoakTest
@@ -24,9 +42,538 @@
*
*
*/
-public class JournalCleanupCompactSoakTest extends JournalCleanupCompactStressTest
+public class JournalCleanupCompactSoakTest extends ServiceTestBase
{
+
+ public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private static final int MAX_WRITES = 20000;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ // We want to maximize the difference between appends and deles, or we could get out
of memory
+ public Semaphore maxRecords;
+
+ private volatile boolean running;
+
+ private AtomicInteger errors = new AtomicInteger(0);
+
+ private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+ private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+ private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
+ private JournalImpl journal;
+
+ ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" +
System.identityHashCode(this),
+ false,
+
JournalCleanupCompactSoakTest.class.getClassLoader());
+
+ private ExecutorService threadPool;
+
+ private OrderedExecutorFactory executorFactory = new
OrderedExecutorFactory(threadPool);
+
+ Executor testExecutor;
+
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ threadPool = Executors.newFixedThreadPool(20, tFactory);
+ executorFactory = new OrderedExecutorFactory(threadPool);
+ testExecutor = executorFactory.getExecutor();
+
+ maxRecords = new Semaphore(MAX_WRITES);
+
+ errors.set(0);
+
+ File dir = new File(getTemporaryDir());
+ dir.mkdirs();
+
+ SequentialFileFactory factory;
+
+ int maxAIO;
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ factory = new AIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
+ }
+ else
+ {
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
+ }
+
+ journal = new JournalImpl(50 * 1024,
+ 20,
+ 50,
+ ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
+ factory,
+ "hornetq-data",
+ "hq",
+ maxAIO)
+ {
+ protected void onCompactLock() throws Exception
+ {
+ }
+
+ protected void onCompactStart() throws Exception
+ {
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // System.out.println("OnCompactStart enter");
+ if (running)
+ {
+ long id = idGen.generateID();
+ journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 },
false);
+ journal.forceMoveNextFile();
+ journal.appendDeleteRecord(id, id == 20);
+ }
+ // System.out.println("OnCompactStart leave");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ }
+
+ };
+
+ journal.start();
+ journal.loadInternalOnly();
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (journal.isStarted())
+ {
+ journal.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ // don't care :-)
+ }
+
+ threadPool.shutdown();
+ }
+
+ public void testAppend() throws Exception
+ {
+
+ running = true;
+ SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+ int NTHREADS = 5;
+
+ FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+ FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i] = new FastAppenderTx();
+ updaters[i] = new FastUpdateTx(appenders[i].queue);
+ }
+
+ t1.start();
+
+ Thread.sleep(1000);
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i].start();
+ updaters[i].start();
+ }
+
+ long timeToEnd = System.currentTimeMillis() + getTotalTimeMilliseconds();
+
+ while (System.currentTimeMillis() < timeToEnd)
+ {
+ System.out.println("Append = " + numberOfRecords +
+ ", Update = " +
+ numberOfUpdates +
+ ", Delete = " +
+ numberOfDeletes +
+ ", liveRecords = " +
+ (numberOfRecords.get() - numberOfDeletes.get()));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ rwLock.writeLock().lock();
+ System.out.println("Restarting server");
+ journal.stop();
+ journal.start();
+ reloadJournal();
+ rwLock.writeLock().unlock();
+ }
+
+ running = false;
+
+ // Release Semaphore after setting running to false or the threads may never
finish
+ maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
+
+ for (Thread t : appenders)
+ {
+ t.join();
+ }
+
+ for (Thread t : updaters)
+ {
+ t.join();
+ }
+
+ t1.join();
+
+ final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latchExecutorDone.countDown();
+ }
+ });
+
+ latchExecutorDone.await();
+
+ journal.stop();
+
+ journal.start();
+
+ reloadJournal();
+
+ Collection<Long> records = journal.getRecords().keySet();
+
+ System.out.println("Deleting everything!");
+ for (Long delInfo : records)
+ {
+ journal.appendDeleteRecord(delInfo, false);
+ }
+
+ journal.forceMoveNextFile();
+
+ journal.checkReclaimStatus();
+
+ Thread.sleep(5000);
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ journal.stop();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void reloadJournal() throws Exception
+ {
+ assertEquals(0, errors.get());
+
+ ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
+ journal.load(committedRecords, preparedTransactions, new
TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ long appends = 0, updates = 0;
+
+ for (RecordInfo record : committedRecords)
+ {
+ if (record.isUpdate)
+ {
+ updates++;
+ }
+ else
+ {
+ appends++;
+ }
+ }
+
+ assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+ }
+
+ private byte[] generateRecord()
+ {
+ int size = RandomUtil.randomPositiveInt() % 10000;
+ if (size == 0)
+ {
+ size = 10000;
+ }
+ return RandomUtil.randomBytes(size);
+ }
+
+ class FastAppenderTx extends Thread
+ {
+ LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+ OperationContextImpl ctx = new
OperationContextImpl(executorFactory.getExecutor());
+
+ public FastAppenderTx()
+ {
+ super("FastAppenderTX");
+ }
+
+ @Override
+ public void run()
+ {
+ rwLock.readLock().lock();
+
+ try
+ {
+ while (running)
+ {
+ final int txSize = RandomUtil.randomMax(100);
+
+ long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ long rollbackTXID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ final long ids[] = new long[txSize];
+
+ for (int i = 0; i < txSize; i++)
+ {
+ ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+ }
+
+ journal.appendAddRecordTransactional(rollbackTXID, ids[0], (byte)0,
generateRecord());
+ journal.appendRollbackRecord(rollbackTXID, true);
+
+ for (int i = 0; i < txSize; i++)
+ {
+ long id = ids[i];
+ journal.appendAddRecordTransactional(txID, id, (byte)0,
generateRecord());
+ maxRecords.acquire();
+ }
+ journal.appendCommitRecord(txID, true, ctx);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ numberOfRecords.addAndGet(txSize);
+ for (Long id : ids)
+ {
+ queue.add(id);
+ }
+ }
+ });
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
+
+ class FastUpdateTx extends Thread
+ {
+ final LinkedBlockingDeque<Long> queue;
+
+ OperationContextImpl ctx = new
OperationContextImpl(executorFactory.getExecutor());
+
+ public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
+ {
+ super("FastUpdateTX");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run()
+ {
+
+ rwLock.readLock().lock();
+
+ try
+ {
+ int txSize = RandomUtil.randomMax(100);
+ int txCount = 0;
+ long ids[] = new long[txSize];
+
+ long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ while (running)
+ {
+
+ Long id = queue.poll(10, TimeUnit.SECONDS);
+ if (id != null)
+ {
+ ids[txCount++] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0,
generateRecord());
+ }
+ if (txCount == txSize || id == null)
+ {
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ }
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+ txCount = 0;
+ txSize = RandomUtil.randomMax(100);
+ txID = JournalCleanupCompactSoakTest.idGen.generateID();
+ ids = new long[txSize];
+ }
+ }
+
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
+
+ class DeleteTask implements IOAsyncTask
+ {
+ final long ids[];
+
+ DeleteTask(final long ids[])
+ {
+ this.ids = ids;
+ }
+
+ public void done()
+ {
+ rwLock.readLock().lock();
+ numberOfUpdates.addAndGet(ids.length);
+ try
+ {
+ for (long id : ids)
+ {
+ if (id != 0)
+ {
+ journal.appendDeleteRecord(id, false);
+ maxRecords.release();
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println("Can't delete id");
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ }
+
+ /** Adds stuff to the journal, but it will take a long time to remove them.
+ * This will cause cleanup and compacting to happen more often
+ */
+ class SlowAppenderNoTX extends Thread
+ {
+
+ public SlowAppenderNoTX()
+ {
+ super("SlowAppender");
+ }
+
+ @Override
+ public void run()
+ {
+ rwLock.readLock().lock();
+ try
+ {
+ while (running)
+ {
+ long ids[] = new long[5];
+ // Append
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("append slow");
+ ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+ maxRecords.acquire();
+ journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+ numberOfRecords.incrementAndGet();
+
+ rwLock.readLock().unlock();
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+ rwLock.readLock().lock();
+ }
+ // Delete
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("Deleting");
+ maxRecords.release();
+ journal.appendDeleteRecord(ids[i], false);
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------