JBoss hornetq SVN: r10481 - in trunk/tests: src/org/hornetq/tests and 8 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-04-12 06:17:50 -0400 (Tue, 12 Apr 2011)
New Revision: 10481
Added:
trunk/tests/timing-tests/
trunk/tests/timing-tests/pom.xml
trunk/tests/timing-tests/src/
trunk/tests/timing-tests/src/test/
trunk/tests/timing-tests/src/test/java/
trunk/tests/timing-tests/src/test/java/org/
trunk/tests/timing-tests/src/test/java/org/hornetq/
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/
Removed:
trunk/tests/src/org/hornetq/tests/timing/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/pom.xml
trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java
Log:
mavenised timing tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-12 10:12:15 UTC (rev 10480)
+++ trunk/tests/hornetq-tests.iml 2011-04-12 10:17:50 UTC (rev 10481)
@@ -10,6 +10,7 @@
<sourceFolder url="file://$MODULE_DIR$/integration-tests/src/main/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/stress-tests/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/timing-tests/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-12 10:12:15 UTC (rev 10480)
+++ trunk/tests/pom.xml 2011-04-12 10:17:50 UTC (rev 10481)
@@ -30,5 +30,6 @@
<module>joram-tests</module>
<module>soak-tests</module>
<module>stress-tests</module>
+ <module>timing-tests</module>
</modules>
</project>
Copied: trunk/tests/timing-tests/pom.xml (from rev 10480, trunk/tests/stress-tests/pom.xml)
===================================================================
--- trunk/tests/timing-tests/pom.xml (rev 0)
+++ trunk/tests/timing-tests/pom.xml 2011-04-12 10:17:50 UTC (rev 10481)
@@ -0,0 +1,140 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>timing-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ timing Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>jms-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified: trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/util/UUIDTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/timing-tests/src/test/java/org/hornetq/tests/timing/util/UUIDTest.java 2011-04-12 10:17:50 UTC (rev 10481)
@@ -13,12 +13,19 @@
package org.hornetq.tests.timing.util;
+import junit.framework.Assert;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.util.HashSet;
+import java.util.Set;
+
/**
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class UUIDTest extends org.hornetq.tests.unit.util.UUIDTest
+public class UUIDTest extends UnitTestCase
{
// Constants -----------------------------------------------------
@@ -31,11 +38,34 @@
// Public --------------------------------------------------------
+ public void testManyUUIDs() throws Exception
+ {
+ final int MANY_TIMES = getTimes();
+ Set<String> uuidsSet = new HashSet<String>();
+
+ UUIDGenerator gen = UUIDGenerator.getInstance();
+ for (int i = 0; i < MANY_TIMES; i++)
+ {
+ uuidsSet.add(gen.generateStringUUID());
+ }
+
+ // we put them in a set to check duplicates
+ Assert.assertEquals(MANY_TIMES, uuidsSet.size());
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
- @Override
protected int getTimes()
{
return 1000000;
13 years, 8 months
JBoss hornetq SVN: r10480 - in trunk/tests: integration-tests/src/main/java/org/hornetq/tests and 20 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-04-12 06:12:15 -0400 (Tue, 12 Apr 2011)
New Revision: 10480
Added:
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java
trunk/tests/stress-tests/
trunk/tests/stress-tests/pom.xml
trunk/tests/stress-tests/src/
trunk/tests/stress-tests/src/test/
trunk/tests/stress-tests/src/test/java/
trunk/tests/stress-tests/src/test/java/org/
trunk/tests/stress-tests/src/test/java/org/hornetq/
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/stress/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
trunk/tests/pom.xml
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java
Log:
mavenised stress tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/hornetq-tests.iml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -9,6 +9,7 @@
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/integration-tests/src/main/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/stress-tests/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Copied: trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java (from rev 10475, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java)
===================================================================
--- trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java (rev 0)
+++ trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -0,0 +1,1428 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.cluster.reattach;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+
+/**
+ * A MultiThreadRandomReattachTestBase
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class MultiThreadRandomReattachTestBase extends MultiThreadReattachSupport
+{
+ private final Logger log = Logger.getLogger(getClass());
+
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private final int LATCH_WAIT = getLatchWait();
+
+ private final int NUM_THREADS = getNumThreads();
+
+ // Attributes ----------------------------------------------------
+ protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ protected HornetQServer liveServer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testA() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestA(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+
+ }
+
+ public void testB() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestB(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testC() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestC(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testD() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestD(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testE() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestE(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testF() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestF(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testG() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestG(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testH() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestH(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testI() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestI(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testJ() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestJ(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testK() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestK(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testL() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestL(sf);
+ }
+ }, NUM_THREADS, true, 10);
+ }
+
+ // public void testM() throws Exception
+ // {
+ // runTestMultipleThreads(new RunnableT()
+ // {
+ // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ // {
+ // doTestM(sf, threadNum);
+ // }
+ // }, NUM_THREADS);
+ // }
+
+ public void testN() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestN(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Added do replicate HORNETQ-264
+ public void testO() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestO(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected abstract void start() throws Exception;
+
+ protected abstract void setBody(ClientMessage message) throws Exception;
+
+ protected abstract boolean checkSize(ClientMessage message);
+
+ protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws Exception
+ {
+ return sf.createSession(false, true, true);
+ }
+
+ protected ClientSession createTransactionalSession(final ClientSessionFactory sf) throws Exception
+ {
+ return sf.createSession(false, false, false);
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum, final ClientSession session2) throws Exception
+ {
+ SimpleString subName = new SimpleString("sub" + threadNum);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientProducer producer = session.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(subName);
+
+ final int numMessages = 100;
+
+ sendMessages(session, producer, numMessages, threadNum);
+
+ session.start();
+
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ producer.close();
+
+ consumer.close();
+
+ session.deleteQueue(subName);
+
+ session.close();
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createTransactionalSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ handler.reset();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ handlers.clear();
+
+ // Set handlers to null
+ for (ClientConsumer consumer : consumers)
+ {
+ consumer.setMessageHandler(null);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed on rollback: " + handler.failure);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ consumer.close();
+ }
+
+ s.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ s.close();
+ }
+
+ /*
+ * This test tests failure during create connection
+ */
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 100;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.assertNull(consumer.receiveImmediate());
+ }
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected int getLatchWait()
+ {
+ return 60000;
+ }
+
+ protected int getNumIterations()
+ {
+ return 2;
+ }
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ log.info("************ Starting test " + getName());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (liveServer != null && liveServer.isStarted())
+ {
+ liveServer.stop();
+ }
+
+ liveServer = null;
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection) throws Exception
+ {
+ runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
+ }
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+
+ runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
+ }
+
+ /**
+ * @return
+ */
+ @Override
+ protected ServerLocator createLocator() throws Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+ return locator;
+ }
+
+ @Override
+ protected void stop() throws Exception
+ {
+ liveServer.stop();
+
+ System.gc();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQBytesMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ setBody(message);
+ producer.send(message);
+ }
+ }
+
+ private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
+ {
+ // We make sure the messages arrive in the order they were sent from a particular producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(consumer, consumerCounts);
+ }
+
+ ClientMessage msg = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ int tn = (Integer)msg.getObjectProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)msg.getObjectProperty(new SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+ }
+
+ c++;
+
+ // Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class MyHandler implements MessageHandler
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ volatile boolean done;
+
+ synchronized void reset()
+ {
+ counts.clear();
+
+ done = false;
+
+ failure = null;
+
+ latch = new CountDownLatch(1);
+ }
+
+ MyHandler(final int threadNum, final int numMessages)
+ {
+ tn = threadNum;
+
+ this.numMessages = numMessages;
+ }
+
+ public synchronized void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getObjectProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)message.getObjectProperty(new SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (!checkSize(message))
+ {
+ failure = "Invalid size on message";
+ log.error(failure);
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ latch.countDown();
+ }
+
+ c++;
+ // Wrap around at numMessages
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+ }
+ }
+}
\ No newline at end of file
Copied: trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java (from rev 10475, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java)
===================================================================
--- trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java (rev 0)
+++ trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.cluster.reattach;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A MultiThreadFailoverSupport
+ *
+ * @author <a href="mailto:time.fox@jboss.org">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * Created Mar 17, 2009 11:15:02 AM
+ *
+ *
+ */
+public abstract class MultiThreadReattachSupport extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Attributes ----------------------------------------------------
+
+ private Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
+ protected abstract ServerLocator createLocator() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ timer = new Timer();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+ timer = null;
+ super.tearDown();
+ }
+
+ protected boolean shouldFail()
+ {
+ return true;
+ }
+
+ protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
+ final int numThreads,
+ final int numIts,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+ for (int its = 0; its < numIts; its++)
+ {
+ log.info("Beginning iteration " + its);
+
+ start();
+
+ final ServerLocator locator = createLocator();
+
+ final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ Failer failer = startFailer(failDelay, session, failOnCreateConnection);
+
+ class Runner extends Thread
+ {
+ private volatile Throwable throwable;
+
+ private final RunnableT test;
+
+ private final int threadNum;
+
+ Runner(final RunnableT test, final int threadNum)
+ {
+ this.test = test;
+
+ this.threadNum = threadNum;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ test.run(sf, threadNum);
+ }
+ catch (Throwable t)
+ {
+ throwable = t;
+
+ log.error("Failed to run test", t);
+
+ // Case a failure happened here, it should print the Thread dump
+ // Sending it to System.out, as it would show on the Tests report
+ System.out.println(UnitTestCase.threadDump(" - fired by MultiThreadRandomReattachTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() +
+ ")"));
+ }
+ }
+ }
+
+ do
+ {
+ List<Runner> threads = new ArrayList<Runner>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ Runner runner = new Runner(runnable, i);
+
+ threads.add(runner);
+
+ runner.start();
+ }
+
+ for (Runner thread : threads)
+ {
+ thread.join();
+
+ if (thread.throwable != null)
+ {
+ throw new Exception("Exception on thread " + thread, thread.throwable);
+ }
+ }
+
+ runnable.checkFail();
+
+ }
+ while (!failer.isExecuted());
+
+ InVMConnector.resetFailures();
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ sf.close();
+
+ stop();
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
+ {
+ Failer failer = new Failer(session, failOnCreateConnection);
+
+ // This is useful for debugging.. just change shouldFail to return false, and Failer will not be executed
+ if (shouldFail())
+ {
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+ }
+
+ return failer;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ protected abstract class RunnableT extends Thread
+ {
+ private volatile String failReason;
+
+ private volatile Throwable throwable;
+
+ public void setFailed(final String reason, final Throwable throwable)
+ {
+ failReason = reason;
+ this.throwable = throwable;
+ }
+
+ public void checkFail()
+ {
+ if (throwable != null)
+ {
+ log.error("Test failed: " + failReason, throwable);
+ }
+ if (failReason != null)
+ {
+ Assert.fail(failReason);
+ }
+ }
+
+ public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
+ }
+
+ private class Failer extends TimerTask
+ {
+ private final ClientSession session;
+
+ private boolean executed;
+
+ private final boolean failOnCreateConnection;
+
+ public Failer(final ClientSession session, final boolean failOnCreateConnection)
+ {
+ this.session = session;
+
+ this.failOnCreateConnection = failOnCreateConnection;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ log.info("** Failing connection");
+
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+ if (failOnCreateConnection)
+ {
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ }
+ else
+ {
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED, "blah"));
+ }
+
+ log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+}
Copied: trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java (from rev 10475, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java)
===================================================================
--- trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java (rev 0)
+++ trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -0,0 +1,702 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.largemessage;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public abstract class LargeMessageTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+ protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ protected HornetQServer server;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ LargeMessageTestBase.log.warn(e.getMessage(), e);
+ }
+ }
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
+ {
+ testChunks(isXA,
+ restartOnXA,
+ rollbackFirstSend,
+ useStreamOnConsume,
+ realFiles,
+ preAck,
+ sendingBlocking,
+ testBrowser,
+ useMessageConsumer,
+ numberOfMessages,
+ numberOfBytes,
+ waitOnConsumer,
+ delayDelivery,
+ -1,
+ 10 * 1024);
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final int producerWindow,
+ final int minSize) throws Exception
+ {
+ clearData();
+
+ server = createServer(realFiles);
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ try
+ {
+
+ if (sendingBlocking)
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ }
+
+ if (producerWindow > 0)
+ {
+ locator.setConfirmationWindowSize(producerWindow);
+ }
+
+ locator.setMinLargeMessageSize(minSize);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session;
+
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ if (rollbackFirstSend)
+ {
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ session.rollback(xid);
+ producer = session.createProducer(ADDRESS);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ //we need to recreate sf's
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ producer = session.createProducer(ADDRESS);
+
+ session.commit(xid, false);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(realFiles);
+ server.start();
+
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientConsumer consumer = null;
+
+ for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+ {
+ session.stop();
+
+ // first time with a browser
+ consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+ if (useMessageConsumer)
+ {
+ final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ int msgCounter;
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
+ // the same
+ // scheduled delivery time
+ Assert.assertEquals(msgCounter,
+ ((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ LargeMessageTestBase.log.debug("Read position " + bytesRead.get() + " on consumer");
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (b == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " + b + " bytes");
+ }
+
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b), buffer.readByte());
+ }
+
+ try
+ {
+ buffer.readByte();
+ Assert.fail("Supposed to throw an exception");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ LargeMessageTestBase.log.warn("Got an error", e);
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ latchDone.countDown();
+ msgCounter++;
+ }
+ }
+ };
+
+ session.start();
+
+ consumer.setMessageHandler(handler);
+
+ Assert.assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+ }
+ else
+ {
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ System.currentTimeMillis();
+
+ ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
+
+ Assert.assertNotNull(message);
+
+ System.currentTimeMillis();
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
+ // scheduled delivery time
+ Assert.assertEquals(i,
+ ((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
+ }
+
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (bytesRead.get() % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " + bytesRead.get() + " bytes");
+ }
+ if (b == (byte)'a')
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0l)
+ {
+ LargeMessageTestBase.log.debug("Read " + b + " bytes");
+ }
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b), buffer.readByte());
+ }
+ }
+
+ }
+
+ }
+ consumer.close();
+
+ if (iteration == 0)
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ else
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+
+ session.close();
+
+ Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws HornetQException
+ */
+ private void sendMessages(final int numberOfMessages,
+ final long numberOfBytes,
+ final long delayDelivery,
+ final ClientSession session,
+ final ClientProducer producer) throws Exception
+ {
+ LargeMessageTestBase.log.debug("NumberOfBytes = " + numberOfBytes);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
+ // test
+ if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+ {
+ LargeMessageTestBase.log.debug("Sending message (stream)" + i);
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+ }
+ else
+ {
+ LargeMessageTestBase.log.debug("Sending message (array)" + i);
+ byte[] bytes = new byte[(int)numberOfBytes];
+ for (int j = 0; j < bytes.length; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(j);
+ }
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ }
+ }
+
+ protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
+ {
+ HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT * numberOfIntegers);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ body.writeInt(i);
+ }
+
+ return body;
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
+ {
+ return createLargeClientMessage(session, numberOfBytes, true);
+ }
+
+ protected ClientMessage createLargeClientMessage (final ClientSession session, final byte[] buffer, final boolean durable) throws Exception
+ {
+ ClientMessage msgs = session.createMessage(durable);
+ msgs.getBodyBuffer().writeBytes(buffer);
+ return msgs;
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws Exception
+ {
+
+ ClientMessage clientMessage = session.createMessage(persistent);
+
+ clientMessage.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws HornetQException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws HornetQException,
+ FileNotFoundException,
+ IOException
+ {
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueToRead);
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ Assert.assertNotNull(clientMessage);
+
+ clientMessage.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+ }
+
+ protected OutputStream createFakeOutputStream() throws Exception
+ {
+
+ return new OutputStream()
+ {
+ private boolean closed = false;
+
+ private int count;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (count++ % 1024 * 1024 == 0)
+ {
+ LargeMessageTestBase.log.debug("OutputStream received " + count + " bytes");
+ }
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ }
+
+ };
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -31,7 +31,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -16,9 +16,9 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.cluster.reattach.MultiThreadRandomReattachTestBase;
/**
*
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -1,1428 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.reattach;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.MessageHandler;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQBytesMessage;
-import org.hornetq.jms.client.HornetQTextMessage;
-
-/**
- * A MultiThreadRandomReattachTestBase
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public abstract class MultiThreadRandomReattachTestBase extends MultiThreadReattachSupport
-{
- private final Logger log = Logger.getLogger(getClass());
-
- // Constants -----------------------------------------------------
-
- private static final int RECEIVE_TIMEOUT = 30000;
-
- private final int LATCH_WAIT = getLatchWait();
-
- private final int NUM_THREADS = getNumThreads();
-
- // Attributes ----------------------------------------------------
- protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
- protected HornetQServer liveServer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testA() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestA(sf, threadNum);
- }
- }, NUM_THREADS, false);
-
- }
-
- public void testB() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestB(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testC() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestC(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testD() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestD(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testE() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestE(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testF() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestF(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testG() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestG(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testH() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestH(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testI() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestI(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testJ() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestJ(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testK() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestK(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testL() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestL(sf);
- }
- }, NUM_THREADS, true, 10);
- }
-
- // public void testM() throws Exception
- // {
- // runTestMultipleThreads(new RunnableT()
- // {
- // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- // {
- // doTestM(sf, threadNum);
- // }
- // }, NUM_THREADS);
- // }
-
- public void testN() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestN(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- // Added do replicate HORNETQ-264
- public void testO() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- doTestO(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected abstract void start() throws Exception;
-
- protected abstract void setBody(ClientMessage message) throws Exception;
-
- protected abstract boolean checkSize(ClientMessage message);
-
- protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws Exception
- {
- return sf.createSession(false, true, true);
- }
-
- protected ClientSession createTransactionalSession(final ClientSessionFactory sf) throws Exception
- {
- return sf.createSession(false, false, false);
- }
-
- protected void doTestA(final ClientSessionFactory sf, final int threadNum, final ClientSession session2) throws Exception
- {
- SimpleString subName = new SimpleString("sub" + threadNum);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientProducer producer = session.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(subName);
-
- final int numMessages = 100;
-
- sendMessages(session, producer, numMessages, threadNum);
-
- session.start();
-
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
-
- producer.close();
-
- consumer.close();
-
- session.deleteQueue(subName);
-
- session.close();
- }
-
- protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createAutoCommitSession(sf);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
-
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- sessSend.close();
-
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createAutoCommitSession(sf);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- sessSend.close();
-
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
-
- }
-
- protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createTransactionalSession(sf);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
-
- handler.reset();
- }
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- handlers.clear();
-
- // Set handlers to null
- for (ClientConsumer consumer : consumers)
- {
- consumer.setMessageHandler(null);
- }
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- // New handlers
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed on rollback: " + handler.failure);
- }
- }
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- // Now with synchronous receive()
-
- protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, true, true);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- consumeMessages(consumers, numMessages, threadNum);
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, true, true);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer = sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- ClientSession s = sf.createSession(false, false, false);
-
- s.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- final int numConsumers = 100;
-
- for (int i = 0; i < numConsumers; i++)
- {
- ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- consumer.close();
- }
-
- s.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- s.close();
- }
-
- /*
- * This test tests failure during create connection
- */
- protected void doTestL(final ClientSessionFactory sf) throws Exception
- {
- final int numSessions = 100;
-
- for (int i = 0; i < numSessions; i++)
- {
- ClientSession session = sf.createSession(false, false, false);
-
- session.close();
- }
- }
-
- protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.stop();
-
- sess.start();
-
- sess.stop();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer = sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- sess.start();
-
- ClientMessage message2 = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.stop();
-
- sess.start();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- for (int i = 0; i < 100; i++)
- {
- Assert.assertNull(consumer.receiveImmediate());
- }
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected int getLatchWait()
- {
- return 60000;
- }
-
- protected int getNumIterations()
- {
- return 2;
- }
-
- protected int getNumThreads()
- {
- return 10;
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- log.info("************ Starting test " + getName());
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if (liveServer != null && liveServer.isStarted())
- {
- liveServer.stop();
- }
-
- liveServer = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- private void runTestMultipleThreads(final RunnableT runnable,
- final int numThreads,
- final boolean failOnCreateConnection) throws Exception
- {
- runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
- }
-
- private void runTestMultipleThreads(final RunnableT runnable,
- final int numThreads,
- final boolean failOnCreateConnection,
- final long failDelay) throws Exception
- {
-
- runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
- }
-
- /**
- * @return
- */
- @Override
- protected ServerLocator createLocator() throws Exception
- {
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
- locator.setReconnectAttempts(-1);
- locator.setConfirmationWindowSize(1024 * 1024);
- return locator;
- }
-
- @Override
- protected void stop() throws Exception
- {
- liveServer.stop();
-
- System.gc();
-
- Assert.assertEquals(0, InVMRegistry.instance.size());
- }
-
- private void sendMessages(final ClientSession sessSend,
- final ClientProducer producer,
- final int numMessages,
- final int threadNum) throws Exception
- {
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createMessage(HornetQBytesMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("threadnum"), threadNum);
- message.putIntProperty(new SimpleString("count"), i);
- setBody(message);
- producer.send(message);
- }
- }
-
- private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
- {
- // We make sure the messages arrive in the order they were sent from a particular producer
- Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
-
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- Map<Integer, Integer> consumerCounts = counts.get(consumer);
-
- if (consumerCounts == null)
- {
- consumerCounts = new HashMap<Integer, Integer>();
- counts.put(consumer, consumerCounts);
- }
-
- ClientMessage msg = consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(msg);
-
- int tn = (Integer)msg.getObjectProperty(new SimpleString("threadnum"));
- int cnt = (Integer)msg.getObjectProperty(new SimpleString("count"));
-
- Integer c = consumerCounts.get(tn);
- if (c == null)
- {
- c = new Integer(cnt);
- }
-
- if (tn == threadNum && cnt != c.intValue())
- {
- throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
- }
-
- c++;
-
- // Wrap
- if (c == numMessages)
- {
- c = 0;
- }
-
- consumerCounts.put(tn, c);
-
- msg.acknowledge();
- }
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private class MyHandler implements MessageHandler
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
-
- volatile String failure;
-
- final int tn;
-
- final int numMessages;
-
- volatile boolean done;
-
- synchronized void reset()
- {
- counts.clear();
-
- done = false;
-
- failure = null;
-
- latch = new CountDownLatch(1);
- }
-
- MyHandler(final int threadNum, final int numMessages)
- {
- tn = threadNum;
-
- this.numMessages = numMessages;
- }
-
- public synchronized void onMessage(final ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (HornetQException me)
- {
- log.error("Failed to process", me);
- }
-
- if (done)
- {
- return;
- }
-
- int threadNum = (Integer)message.getObjectProperty(new SimpleString("threadnum"));
- int cnt = (Integer)message.getObjectProperty(new SimpleString("count"));
-
- Integer c = counts.get(threadNum);
- if (c == null)
- {
- c = new Integer(cnt);
- }
-
- if (tn == threadNum && cnt != c.intValue())
- {
- failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
- log.error(failure);
-
- latch.countDown();
- }
-
- if (!checkSize(message))
- {
- failure = "Invalid size on message";
- log.error(failure);
- latch.countDown();
- }
-
- if (tn == threadNum && c == numMessages - 1)
- {
- done = true;
- latch.countDown();
- }
-
- c++;
- // Wrap around at numMessages
- if (c == numMessages)
- {
- c = 0;
- }
-
- counts.put(threadNum, c);
-
- }
- }
-}
\ No newline at end of file
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -1,281 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.reattach;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A MultiThreadFailoverSupport
- *
- * @author <a href="mailto:time.fox@jboss.org">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * Created Mar 17, 2009 11:15:02 AM
- *
- *
- */
-public abstract class MultiThreadReattachSupport extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- private final Logger log = Logger.getLogger(this.getClass());
-
- // Attributes ----------------------------------------------------
-
- private Timer timer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected abstract void start() throws Exception;
-
- protected abstract void stop() throws Exception;
-
- protected abstract ServerLocator createLocator() throws Exception;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- timer = new Timer();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- timer.cancel();
- timer = null;
- super.tearDown();
- }
-
- protected boolean shouldFail()
- {
- return true;
- }
-
- protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
- final int numThreads,
- final int numIts,
- final boolean failOnCreateConnection,
- final long failDelay) throws Exception
- {
- for (int its = 0; its < numIts; its++)
- {
- log.info("Beginning iteration " + its);
-
- start();
-
- final ServerLocator locator = createLocator();
-
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- Failer failer = startFailer(failDelay, session, failOnCreateConnection);
-
- class Runner extends Thread
- {
- private volatile Throwable throwable;
-
- private final RunnableT test;
-
- private final int threadNum;
-
- Runner(final RunnableT test, final int threadNum)
- {
- this.test = test;
-
- this.threadNum = threadNum;
- }
-
- @Override
- public void run()
- {
- try
- {
- test.run(sf, threadNum);
- }
- catch (Throwable t)
- {
- throwable = t;
-
- log.error("Failed to run test", t);
-
- // Case a failure happened here, it should print the Thread dump
- // Sending it to System.out, as it would show on the Tests report
- System.out.println(UnitTestCase.threadDump(" - fired by MultiThreadRandomReattachTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() +
- ")"));
- }
- }
- }
-
- do
- {
- List<Runner> threads = new ArrayList<Runner>();
-
- for (int i = 0; i < numThreads; i++)
- {
- Runner runner = new Runner(runnable, i);
-
- threads.add(runner);
-
- runner.start();
- }
-
- for (Runner thread : threads)
- {
- thread.join();
-
- if (thread.throwable != null)
- {
- throw new Exception("Exception on thread " + thread, thread.throwable);
- }
- }
-
- runnable.checkFail();
-
- }
- while (!failer.isExecuted());
-
- InVMConnector.resetFailures();
-
- session.close();
-
- locator.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
-
- sf.close();
-
- stop();
- }
- }
-
- // Private -------------------------------------------------------
-
- private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
- {
- Failer failer = new Failer(session, failOnCreateConnection);
-
- // This is useful for debugging.. just change shouldFail to return false, and Failer will not be executed
- if (shouldFail())
- {
- timer.schedule(failer, (long)(time * Math.random()), 100);
- }
-
- return failer;
- }
-
- // Inner classes -------------------------------------------------
-
- protected abstract class RunnableT extends Thread
- {
- private volatile String failReason;
-
- private volatile Throwable throwable;
-
- public void setFailed(final String reason, final Throwable throwable)
- {
- failReason = reason;
- this.throwable = throwable;
- }
-
- public void checkFail()
- {
- if (throwable != null)
- {
- log.error("Test failed: " + failReason, throwable);
- }
- if (failReason != null)
- {
- Assert.fail(failReason);
- }
- }
-
- public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
- }
-
- private class Failer extends TimerTask
- {
- private final ClientSession session;
-
- private boolean executed;
-
- private final boolean failOnCreateConnection;
-
- public Failer(final ClientSession session, final boolean failOnCreateConnection)
- {
- this.session = session;
-
- this.failOnCreateConnection = failOnCreateConnection;
- }
-
- @Override
- public synchronized void run()
- {
- log.info("** Failing connection");
-
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
-
- if (failOnCreateConnection)
- {
- InVMConnector.numberOfFailures = 1;
- InVMConnector.failOnCreateConnection = true;
- }
- else
- {
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED, "blah"));
- }
-
- log.info("** Fail complete");
-
- cancel();
-
- executed = true;
- }
-
- public synchronized boolean isExecuted()
- {
- return executed;
- }
- }
-
-}
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -1,702 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A LargeMessageTestBase
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * Created Oct 29, 2008 11:43:52 AM
- *
- *
- */
-public abstract class LargeMessageTestBase extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
-
- protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- protected HornetQServer server;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void tearDown() throws Exception
- {
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e)
- {
- LargeMessageTestBase.log.warn(e.getMessage(), e);
- }
- }
-
- server = null;
-
- super.tearDown();
- }
-
- protected void testChunks(final boolean isXA,
- final boolean restartOnXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery) throws Exception
- {
- testChunks(isXA,
- restartOnXA,
- rollbackFirstSend,
- useStreamOnConsume,
- realFiles,
- preAck,
- sendingBlocking,
- testBrowser,
- useMessageConsumer,
- numberOfMessages,
- numberOfBytes,
- waitOnConsumer,
- delayDelivery,
- -1,
- 10 * 1024);
- }
-
- protected void testChunks(final boolean isXA,
- final boolean restartOnXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery,
- final int producerWindow,
- final int minSize) throws Exception
- {
- clearData();
-
- server = createServer(realFiles);
- server.start();
-
- ServerLocator locator = createInVMNonHALocator();
- try
- {
-
- if (sendingBlocking)
- {
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- }
-
- if (producerWindow > 0)
- {
- locator.setConfirmationWindowSize(producerWindow);
- }
-
- locator.setMinLargeMessageSize(minSize);
-
- ClientSessionFactory sf = locator.createSessionFactory();
-
- ClientSession session;
-
- Xid xid = null;
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- if (rollbackFirstSend)
- {
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- session.close();
-
- if (realFiles && restartOnXA)
- {
- server.stop();
- server.start();
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
- Assert.assertEquals(1, xids.length);
- Assert.assertEquals(xid, xids[0]);
-
- session.rollback(xid);
- producer = session.createProducer(ADDRESS);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- validateNoFilesOnLargeDir();
- }
-
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- session.close();
-
- if (realFiles && restartOnXA)
- {
- server.stop();
- server.start();
- //we need to recreate sf's
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
- Assert.assertEquals(1, xids.length);
- Assert.assertEquals(xid, xids[0]);
-
- producer = session.createProducer(ADDRESS);
-
- session.commit(xid, false);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- session.close();
-
- if (realFiles)
- {
- server.stop();
-
- server = createServer(realFiles);
- server.start();
-
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientConsumer consumer = null;
-
- for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
- {
- session.stop();
-
- // first time with a browser
- consumer = session.createConsumer(ADDRESS, null, iteration == 0);
-
- if (useMessageConsumer)
- {
- final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
- final AtomicInteger errors = new AtomicInteger(0);
-
- MessageHandler handler = new MessageHandler()
- {
- int msgCounter;
-
- public void onMessage(final ClientMessage message)
- {
- try
- {
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getObjectProperty(new SimpleString("original-time"));
- Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- Assert.assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
- // the same
- // scheduled delivery time
- Assert.assertEquals(msgCounter,
- ((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- @Override
- public void write(final byte b[]) throws IOException
- {
- if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- LargeMessageTestBase.log.debug("Read position " + bytesRead.get() + " on consumer");
- }
- else
- {
- LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
- }
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (b == UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- LargeMessageTestBase.log.warn("byte not as expected!");
- }
- }
- });
-
- Assert.assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
-
- HornetQBuffer buffer = message.getBodyBuffer();
- buffer.resetReaderIndex();
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0)
- {
- LargeMessageTestBase.log.debug("Read " + b + " bytes");
- }
-
- Assert.assertEquals(UnitTestCase.getSamplebyte(b), buffer.readByte());
- }
-
- try
- {
- buffer.readByte();
- Assert.fail("Supposed to throw an exception");
- }
- catch (Exception e)
- {
- }
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- LargeMessageTestBase.log.warn("Got an error", e);
- errors.incrementAndGet();
- }
- finally
- {
- latchDone.countDown();
- msgCounter++;
- }
- }
- };
-
- session.start();
-
- consumer.setMessageHandler(handler);
-
- Assert.assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
- Assert.assertEquals(0, errors.get());
- }
- else
- {
-
- session.start();
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- System.currentTimeMillis();
-
- ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
-
- Assert.assertNotNull(message);
-
- System.currentTimeMillis();
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getObjectProperty(new SimpleString("original-time"));
- Assert.assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- Assert.assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
- // scheduled delivery time
- Assert.assertEquals(i,
- ((Integer)message.getObjectProperty(new SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- @Override
- public void write(final byte b[]) throws IOException
- {
- if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- }
- else
- {
- LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
- }
-
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (bytesRead.get() % (1024l * 1024l) == 0)
- {
- LargeMessageTestBase.log.debug("Read " + bytesRead.get() + " bytes");
- }
- if (b == (byte)'a')
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- LargeMessageTestBase.log.warn("byte not as expected!");
- }
- }
- });
-
- Assert.assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
- HornetQBuffer buffer = message.getBodyBuffer();
- buffer.resetReaderIndex();
-
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0l)
- {
- LargeMessageTestBase.log.debug("Read " + b + " bytes");
- }
- Assert.assertEquals(UnitTestCase.getSamplebyte(b), buffer.readByte());
- }
- }
-
- }
-
- }
- consumer.close();
-
- if (iteration == 0)
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
- }
- else
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
- }
- }
-
- session.close();
-
- Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /**
- * @param useFile
- * @param numberOfMessages
- * @param numberOfIntegers
- * @param delayDelivery
- * @param testTime
- * @param session
- * @param producer
- * @throws FileNotFoundException
- * @throws IOException
- * @throws HornetQException
- */
- private void sendMessages(final int numberOfMessages,
- final long numberOfBytes,
- final long delayDelivery,
- final ClientSession session,
- final ClientProducer producer) throws Exception
- {
- LargeMessageTestBase.log.debug("NumberOfBytes = " + numberOfBytes);
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
-
- // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
- // test
- if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
- {
- LargeMessageTestBase.log.debug("Sending message (stream)" + i);
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
- }
- else
- {
- LargeMessageTestBase.log.debug("Sending message (array)" + i);
- byte[] bytes = new byte[(int)numberOfBytes];
- for (int j = 0; j < bytes.length; j++)
- {
- bytes[j] = UnitTestCase.getSamplebyte(j);
- }
- message.getBodyBuffer().writeBytes(bytes);
- }
- message.putIntProperty(new SimpleString("counter-message"), i);
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
- }
- }
-
- protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
- {
- HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT * numberOfIntegers);
-
- for (int i = 0; i < numberOfIntegers; i++)
- {
- body.writeInt(i);
- }
-
- return body;
-
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
- {
- return createLargeClientMessage(session, numberOfBytes, true);
- }
-
- protected ClientMessage createLargeClientMessage (final ClientSession session, final byte[] buffer, final boolean durable) throws Exception
- {
- ClientMessage msgs = session.createMessage(durable);
- msgs.getBodyBuffer().writeBytes(buffer);
- return msgs;
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session,
- final long numberOfBytes,
- final boolean persistent) throws Exception
- {
-
- ClientMessage clientMessage = session.createMessage(persistent);
-
- clientMessage.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
-
- return clientMessage;
- }
-
- /**
- * @param session
- * @param queueToRead
- * @param numberOfIntegers
- * @throws HornetQException
- * @throws FileNotFoundException
- * @throws IOException
- */
- protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws HornetQException,
- FileNotFoundException,
- IOException
- {
- session.start();
-
- ClientConsumer consumer = session.createConsumer(queueToRead);
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- Assert.assertNotNull(clientMessage);
-
- clientMessage.acknowledge();
-
- session.commit();
-
- consumer.close();
- }
-
- protected OutputStream createFakeOutputStream() throws Exception
- {
-
- return new OutputStream()
- {
- private boolean closed = false;
-
- private int count;
-
- @Override
- public void close() throws IOException
- {
- super.close();
- closed = true;
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (count++ % 1024 * 1024 == 0)
- {
- LargeMessageTestBase.log.debug("OutputStream received " + count + " bytes");
- }
- if (closed)
- {
- throw new IOException("Stream was closed");
- }
- }
-
- };
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/pom.xml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -29,5 +29,6 @@
<module>jms-tests</module>
<module>joram-tests</module>
<module>soak-tests</module>
+ <module>stress-tests</module>
</modules>
</project>
Copied: trunk/tests/stress-tests/pom.xml (from rev 10479, trunk/tests/soak-tests/pom.xml)
===================================================================
--- trunk/tests/stress-tests/pom.xml (rev 0)
+++ trunk/tests/stress-tests/pom.xml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -0,0 +1,140 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>stress-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ stress Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>jms-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -13,7 +13,7 @@
package org.hornetq.tests.stress.chunk;
-import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.largemessage.LargeMessageTestBase;
/**
* A MessageChunkSoakTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -17,7 +17,7 @@
import org.hornetq.api.core.client.*;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A SendStressTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -13,7 +13,12 @@
package org.hornetq.tests.stress.failover;
-import org.hornetq.tests.integration.cluster.reattach.MultiThreadRandomReattachTest;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.cluster.reattach.MultiThreadRandomReattachTestBase;
/**
* A MultiThreadRandomFailoverStressTest
@@ -21,9 +26,41 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
-public class MultiThreadRandomReattachStressTest extends MultiThreadRandomReattachTest
+public class MultiThreadRandomReattachStressTest extends MultiThreadRandomReattachTestBase
{
+ private static final Logger log = Logger.getLogger(MultiThreadRandomReattachStressTest.class);
+
@Override
+ protected void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveServer = HornetQServers.newHornetQServer(liveConf, false);
+ liveServer.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#setBody(org.hornetq.api.core.client.ClientMessage)
+ */
+ @Override
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+ // Give each msg a body
+ message.getBodyBuffer().writeBytes(new byte[250]);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#checkSize(org.hornetq.api.core.client.ClientMessage)
+ */
+ @Override
+ protected boolean checkSize(final ClientMessage message)
+ {
+ return message.getBodyBuffer().readableBytes() == 250;
+ }
+
+ @Override
protected int getNumIterations()
{
return 100;
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/failover/RandomReattachStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -13,8 +13,26 @@
package org.hornetq.tests.stress.failover;
-import org.hornetq.tests.integration.cluster.reattach.RandomReattachTest;
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* A RandomFailoverStressTest
*
@@ -24,24 +42,1530 @@
*
*
*/
-public class RandomReattachStressTest extends RandomReattachTest
+public class RandomReattachStressTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(RandomReattachStressTest.class);
+
// Constants -----------------------------------------------------
+ private static final int RECEIVE_TIMEOUT = 10000;
+
// Attributes ----------------------------------------------------
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private HornetQServer liveService;
+
+ private Timer timer;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ public void testA() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestA(sf);
+ }
+ });
+ }
+
+ public void testB() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestB(sf);
+ }
+ });
+ }
+
+ public void testC() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestC(sf);
+ }
+ });
+ }
+
+ public void testD() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestD(sf);
+ }
+ });
+ }
+
+ public void testE() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestE(sf);
+ }
+ });
+ }
+
+ public void testF() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestF(sf);
+ }
+ });
+ }
+
+ public void testG() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestG(sf);
+ }
+ });
+ }
+
+ public void testH() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestH(sf);
+ }
+ });
+ }
+
+ public void testI() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestI(sf);
+ }
+ });
+ }
+
+ public void testJ() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestJ(sf);
+ }
+ });
+ }
+
+ public void testK() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestK(sf);
+ }
+ });
+ }
+
+ public void testL() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestL(sf);
+ }
+ });
+ }
+
+ public void testN() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestN(sf);
+ }
+ });
+ }
+
+ public void runTest(final RunnableT runnable) throws Exception
+ {
+ final int numIts = getNumIterations();
+
+ for (int its = 0; its < numIts; its++)
+ {
+ RandomReattachStressTest.log.info("####" + getName() + " iteration #" + its);
+ start();
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
+
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Failer failer = startFailer(1000, session);
+
+ do
+ {
+ runnable.run(sf);
+ }
+ while (!failer.isExecuted());
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ stop();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected void doTestA(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ RandomReattachStressTest.log.error("Failed to process", me);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue("Didn't receive all messages", ok);
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 50;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 1;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, expected " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ if (msg == null)
+ {
+ throw new IllegalStateException("Failed to receive message " + i);
+ }
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ Assert.assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS, RandomReattachStressTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS, RandomReattachStressTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(RandomReattachStressTest.ADDRESS, RandomReattachStressTest.ADDRESS, null, false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ consumer.close();
+ }
+
+ s.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ s.close();
+ }
+
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 10;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS,
+ new SimpleString(RandomReattachStressTest.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(RandomReattachStressTest.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 = consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(RandomReattachStressTest.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
@Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ timer = new Timer(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+
+ InVMRegistry.instance.clear();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session)
+ {
+ Failer failer = new Failer((ClientSessionInternal)session);
+
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+
+ return failer;
+ }
+
+ private void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveService = HornetQServers.newHornetQServer(liveConf, false);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ liveService.stop();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+
+ liveService = null;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class Failer extends TimerTask
+ {
+ private final ClientSessionInternal session;
+
+ private boolean executed;
+
+ public Failer(final ClientSessionInternal session)
+ {
+ this.session = session;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ RandomReattachStressTest.log.info("** Failing connection");
+
+ session.getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+
+ RandomReattachStressTest.log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+ public abstract class RunnableT
+ {
+ abstract void run(final ClientSessionFactory sf) throws Exception;
+ }
+
+ static abstract class AssertionCheckMessageHandler implements MessageHandler
+ {
+
+
+ public void checkAssertions()
+ {
+ for (AssertionFailedError e: errors)
+ {
+ // it will throw the first error
+ throw e;
+ }
+ }
+
+ private ArrayList<AssertionFailedError> errors = new ArrayList<AssertionFailedError>();
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+ */
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ onMessageAssert(message);
+ }
+ catch (AssertionFailedError e)
+ {
+ e.printStackTrace(); // System.out -> junit reports
+ errors.add(e);
+ }
+ }
+
+ public abstract void onMessageAssert(ClientMessage message);
+
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
protected int getNumIterations()
{
return 100;
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -26,7 +26,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A CompactingTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -40,7 +40,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -25,7 +25,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* Simulates the journal being updated, compacted cleared up,
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -24,7 +24,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A LargeJournalStressTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A MultiThreadConsumerStressTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -35,7 +35,7 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A MultiThreadConsumerStressTest
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -50,7 +50,7 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.LinkedListIterator;
/**
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -28,7 +28,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* This is an integration-tests that will take some time to run.
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-12 10:12:15 UTC (rev 10480)
@@ -29,9 +29,8 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.integration.remoting.PingTest;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
@@ -42,7 +41,7 @@
{
// Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(PingTest.class);
+ private static final Logger log = Logger.getLogger(PingStressTest.class);
private static final long PING_INTERVAL = 500;
13 years, 8 months
JBoss hornetq SVN: r10479 - in trunk/tests: soak-tests and 10 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-04-12 05:47:15 -0400 (Tue, 12 Apr 2011)
New Revision: 10479
Added:
trunk/tests/soak-tests/
trunk/tests/soak-tests/pom.xml
trunk/tests/soak-tests/src/
trunk/tests/soak-tests/src/test/
trunk/tests/soak-tests/src/test/java/
trunk/tests/soak-tests/src/test/java/org/
trunk/tests/soak-tests/src/test/java/org/hornetq/
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/
Removed:
trunk/tests/src/org/hornetq/tests/soak/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/pom.xml
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Log:
mavenised soak tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/hornetq-tests.iml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -8,6 +8,7 @@
<sourceFolder url="file://$MODULE_DIR$/integration-tests/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/integration-tests/src/main/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-11 23:03:32 UTC (rev 10478)
+++ trunk/tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -28,5 +28,6 @@
<module>integration-tests</module>
<module>jms-tests</module>
<module>joram-tests</module>
+ <module>soak-tests</module>
</modules>
</project>
Added: trunk/tests/soak-tests/pom.xml
===================================================================
--- trunk/tests/soak-tests/pom.xml (rev 0)
+++ trunk/tests/soak-tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
@@ -0,0 +1,140 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>soak-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ soak Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>jms-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientNonDivertedSoakTest.java 2011-04-12 09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/ClientSoakTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/ClientSoakTest.java 2011-04-12 09:47:15 UTC (rev 10479)
@@ -27,7 +27,7 @@
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/client/SimpleSendReceiveSoakTest.java 2011-04-12 09:47:15 UTC (rev 10479)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A ClientSoakTest
Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/failover/RandomFailoverSoakTest.java 2011-04-12 09:47:15 UTC (rev 10479)
@@ -13,8 +13,27 @@
package org.hornetq.tests.soak.failover;
-import org.hornetq.tests.integration.cluster.reattach.RandomReattachTest;
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* A RandomFailoverSoakTest
*
@@ -24,10 +43,1515 @@
*
*
*/
-public class RandomFailoverSoakTest extends RandomReattachTest
+public class RandomFailoverSoakTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(RandomFailoverSoakTest.class);
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 10000;
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private HornetQServer liveService;
+
+ private Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testA() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestA(sf);
+ }
+ });
+ }
+
+ public void testB() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestB(sf);
+ }
+ });
+ }
+
+ public void testC() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestC(sf);
+ }
+ });
+ }
+
+ public void testD() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestD(sf);
+ }
+ });
+ }
+
+ public void testE() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestE(sf);
+ }
+ });
+ }
+
+ public void testF() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestF(sf);
+ }
+ });
+ }
+
+ public void testG() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestG(sf);
+ }
+ });
+ }
+
+ public void testH() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestH(sf);
+ }
+ });
+ }
+
+ public void testI() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestI(sf);
+ }
+ });
+ }
+
+ public void testJ() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestJ(sf);
+ }
+ });
+ }
+
+ public void testK() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestK(sf);
+ }
+ });
+ }
+
+ public void testL() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestL(sf);
+ }
+ });
+ }
+
+ public void testN() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestN(sf);
+ }
+ });
+ }
+
+ public void runTest(final RunnableT runnable) throws Exception
+ {
+ final int numIts = getNumIterations();
+
+ for (int its = 0; its < numIts; its++)
+ {
+ RandomFailoverSoakTest.log.info("####" + getName() + " iteration #" + its);
+ start();
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
+
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Failer failer = startFailer(1000, session);
+
+ do
+ {
+ runnable.run(sf);
+ }
+ while (!failer.isExecuted());
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void doTestA(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ RandomFailoverSoakTest.log.error("Failed to process", me);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue("Didn't receive all messages", ok);
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 50;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 1;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, expected " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ if (msg == null)
+ {
+ throw new IllegalStateException("Failed to receive message " + i);
+ }
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ Assert.assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomFailoverSoakTest.ADDRESS, subName, null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer = sessSend.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomFailoverSoakTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(RandomFailoverSoakTest.ADDRESS, RandomFailoverSoakTest.ADDRESS, null, false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(RandomFailoverSoakTest.ADDRESS);
+
+ consumer.close();
+ }
+
+ s.deleteQueue(RandomFailoverSoakTest.ADDRESS);
+
+ s.close();
+ }
+
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 10;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomFailoverSoakTest.ADDRESS,
+ new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(RandomFailoverSoakTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 = consumer.receive(RandomFailoverSoakTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(RandomFailoverSoakTest.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
@Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ timer = new Timer(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+
+ InVMRegistry.instance.clear();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session)
+ {
+ Failer failer = new Failer((ClientSessionInternal)session);
+
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+
+ return failer;
+ }
+
+ private void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveService = HornetQServers.newHornetQServer(liveConf, false);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ liveService.stop();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+
+ liveService = null;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class Failer extends TimerTask
+ {
+ private final ClientSessionInternal session;
+
+ private boolean executed;
+
+ public Failer(final ClientSessionInternal session)
+ {
+ this.session = session;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ RandomFailoverSoakTest.log.info("** Failing connection");
+
+ session.getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+
+ RandomFailoverSoakTest.log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+ public abstract class RunnableT
+ {
+ abstract void run(final ClientSessionFactory sf) throws Exception;
+ }
+
+ static abstract class AssertionCheckMessageHandler implements MessageHandler
+ {
+
+
+ public void checkAssertions()
+ {
+ for (AssertionFailedError e: errors)
+ {
+ // it will throw the first error
+ throw e;
+ }
+ }
+
+ private ArrayList<AssertionFailedError> errors = new ArrayList<AssertionFailedError>();
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+ */
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ onMessageAssert(message);
+ }
+ catch (AssertionFailedError e)
+ {
+ e.printStackTrace(); // System.out -> junit reports
+ errors.add(e);
+ }
+ }
+
+ public abstract void onMessageAssert(ClientMessage message);
+
+ }
+
protected int getNumIterations()
{
return 500;
Modified: trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/soak-tests/src/test/java/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2011-04-12 09:47:15 UTC (rev 10479)
@@ -13,9 +13,27 @@
package org.hornetq.tests.soak.journal;
-import java.util.concurrent.TimeUnit;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.hornetq.tests.stress.journal.JournalCleanupCompactStressTest;
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.*;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SimpleIDGenerator;
/**
* A JournalCleanupCompactSoakTest
@@ -24,9 +42,538 @@
*
*
*/
-public class JournalCleanupCompactSoakTest extends JournalCleanupCompactStressTest
+public class JournalCleanupCompactSoakTest extends ServiceTestBase
{
+
+ public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
+
+ private static final int MAX_WRITES = 20000;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ // We want to maximize the difference between appends and deles, or we could get out of memory
+ public Semaphore maxRecords;
+
+ private volatile boolean running;
+
+ private AtomicInteger errors = new AtomicInteger(0);
+
+ private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+ private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+ private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
+ private JournalImpl journal;
+
+ ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
+ false,
+ JournalCleanupCompactSoakTest.class.getClassLoader());
+
+ private ExecutorService threadPool;
+
+ private OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+
+ Executor testExecutor;
+
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ threadPool = Executors.newFixedThreadPool(20, tFactory);
+ executorFactory = new OrderedExecutorFactory(threadPool);
+ testExecutor = executorFactory.getExecutor();
+
+ maxRecords = new Semaphore(MAX_WRITES);
+
+ errors.set(0);
+
+ File dir = new File(getTemporaryDir());
+ dir.mkdirs();
+
+ SequentialFileFactory factory;
+
+ int maxAIO;
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ factory = new AIOSequentialFileFactory(dir.getPath());
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO;
+ }
+ else
+ {
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
+ maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
+ }
+
+ journal = new JournalImpl(50 * 1024,
+ 20,
+ 50,
+ ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
+ factory,
+ "hornetq-data",
+ "hq",
+ maxAIO)
+ {
+ protected void onCompactLock() throws Exception
+ {
+ }
+
+ protected void onCompactStart() throws Exception
+ {
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // System.out.println("OnCompactStart enter");
+ if (running)
+ {
+ long id = idGen.generateID();
+ journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
+ journal.forceMoveNextFile();
+ journal.appendDeleteRecord(id, id == 20);
+ }
+ // System.out.println("OnCompactStart leave");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ }
+
+ };
+
+ journal.start();
+ journal.loadInternalOnly();
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ if (journal.isStarted())
+ {
+ journal.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ // don't care :-)
+ }
+
+ threadPool.shutdown();
+ }
+
+ public void testAppend() throws Exception
+ {
+
+ running = true;
+ SlowAppenderNoTX t1 = new SlowAppenderNoTX();
+
+ int NTHREADS = 5;
+
+ FastAppenderTx appenders[] = new FastAppenderTx[NTHREADS];
+ FastUpdateTx updaters[] = new FastUpdateTx[NTHREADS];
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i] = new FastAppenderTx();
+ updaters[i] = new FastUpdateTx(appenders[i].queue);
+ }
+
+ t1.start();
+
+ Thread.sleep(1000);
+
+ for (int i = 0; i < NTHREADS; i++)
+ {
+ appenders[i].start();
+ updaters[i].start();
+ }
+
+ long timeToEnd = System.currentTimeMillis() + getTotalTimeMilliseconds();
+
+ while (System.currentTimeMillis() < timeToEnd)
+ {
+ System.out.println("Append = " + numberOfRecords +
+ ", Update = " +
+ numberOfUpdates +
+ ", Delete = " +
+ numberOfDeletes +
+ ", liveRecords = " +
+ (numberOfRecords.get() - numberOfDeletes.get()));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ rwLock.writeLock().lock();
+ System.out.println("Restarting server");
+ journal.stop();
+ journal.start();
+ reloadJournal();
+ rwLock.writeLock().unlock();
+ }
+
+ running = false;
+
+ // Release Semaphore after setting running to false or the threads may never finish
+ maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
+
+ for (Thread t : appenders)
+ {
+ t.join();
+ }
+
+ for (Thread t : updaters)
+ {
+ t.join();
+ }
+
+ t1.join();
+
+ final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latchExecutorDone.countDown();
+ }
+ });
+
+ latchExecutorDone.await();
+
+ journal.stop();
+
+ journal.start();
+
+ reloadJournal();
+
+ Collection<Long> records = journal.getRecords().keySet();
+
+ System.out.println("Deleting everything!");
+ for (Long delInfo : records)
+ {
+ journal.appendDeleteRecord(delInfo, false);
+ }
+
+ journal.forceMoveNextFile();
+
+ journal.checkReclaimStatus();
+
+ Thread.sleep(5000);
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ journal.stop();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void reloadJournal() throws Exception
+ {
+ assertEquals(0, errors.get());
+
+ ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+ journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ long appends = 0, updates = 0;
+
+ for (RecordInfo record : committedRecords)
+ {
+ if (record.isUpdate)
+ {
+ updates++;
+ }
+ else
+ {
+ appends++;
+ }
+ }
+
+ assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+ }
+
+ private byte[] generateRecord()
+ {
+ int size = RandomUtil.randomPositiveInt() % 10000;
+ if (size == 0)
+ {
+ size = 10000;
+ }
+ return RandomUtil.randomBytes(size);
+ }
+
+ class FastAppenderTx extends Thread
+ {
+ LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastAppenderTx()
+ {
+ super("FastAppenderTX");
+ }
+
+ @Override
+ public void run()
+ {
+ rwLock.readLock().lock();
+
+ try
+ {
+ while (running)
+ {
+ final int txSize = RandomUtil.randomMax(100);
+
+ long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ long rollbackTXID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ final long ids[] = new long[txSize];
+
+ for (int i = 0; i < txSize; i++)
+ {
+ ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+ }
+
+ journal.appendAddRecordTransactional(rollbackTXID, ids[0], (byte)0, generateRecord());
+ journal.appendRollbackRecord(rollbackTXID, true);
+
+ for (int i = 0; i < txSize; i++)
+ {
+ long id = ids[i];
+ journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
+ maxRecords.acquire();
+ }
+ journal.appendCommitRecord(txID, true, ctx);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ numberOfRecords.addAndGet(txSize);
+ for (Long id : ids)
+ {
+ queue.add(id);
+ }
+ }
+ });
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
+
+ class FastUpdateTx extends Thread
+ {
+ final LinkedBlockingDeque<Long> queue;
+
+ OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
+
+ public FastUpdateTx(final LinkedBlockingDeque<Long> queue)
+ {
+ super("FastUpdateTX");
+ this.queue = queue;
+ }
+
+ @Override
+ public void run()
+ {
+
+ rwLock.readLock().lock();
+
+ try
+ {
+ int txSize = RandomUtil.randomMax(100);
+ int txCount = 0;
+ long ids[] = new long[txSize];
+
+ long txID = JournalCleanupCompactSoakTest.idGen.generateID();
+
+ while (running)
+ {
+
+ Long id = queue.poll(10, TimeUnit.SECONDS);
+ if (id != null)
+ {
+ ids[txCount++] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ }
+ if (txCount == txSize || id == null)
+ {
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ }
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+ txCount = 0;
+ txSize = RandomUtil.randomMax(100);
+ txID = JournalCleanupCompactSoakTest.idGen.generateID();
+ ids = new long[txSize];
+ }
+ }
+
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
+
+ class DeleteTask implements IOAsyncTask
+ {
+ final long ids[];
+
+ DeleteTask(final long ids[])
+ {
+ this.ids = ids;
+ }
+
+ public void done()
+ {
+ rwLock.readLock().lock();
+ numberOfUpdates.addAndGet(ids.length);
+ try
+ {
+ for (long id : ids)
+ {
+ if (id != 0)
+ {
+ journal.appendDeleteRecord(id, false);
+ maxRecords.release();
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println("Can't delete id");
+ e.printStackTrace();
+ running = false;
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
+
+ }
+
+ /** Adds stuff to the journal, but it will take a long time to remove them.
+ * This will cause cleanup and compacting to happen more often
+ */
+ class SlowAppenderNoTX extends Thread
+ {
+
+ public SlowAppenderNoTX()
+ {
+ super("SlowAppender");
+ }
+
+ @Override
+ public void run()
+ {
+ rwLock.readLock().lock();
+ try
+ {
+ while (running)
+ {
+ long ids[] = new long[5];
+ // Append
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("append slow");
+ ids[i] = JournalCleanupCompactSoakTest.idGen.generateID();
+ maxRecords.acquire();
+ journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
+ numberOfRecords.incrementAndGet();
+
+ rwLock.readLock().unlock();
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+ rwLock.readLock().lock();
+ }
+ // Delete
+ for (int i = 0; running & i < ids.length; i++)
+ {
+ System.out.println("Deleting");
+ maxRecords.release();
+ journal.appendDeleteRecord(ids[i], false);
+ numberOfDeletes.incrementAndGet();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
+ }
+ }
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
13 years, 8 months
JBoss hornetq SVN: r10478 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/remote.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-11 19:03:32 -0400 (Mon, 11 Apr 2011)
New Revision: 10478
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
Log:
small tweak on test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-11 20:52:34 UTC (rev 10477)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-11 23:03:32 UTC (rev 10478)
@@ -227,6 +227,8 @@
}
csf1.close();
+
+ locator.close();
}
13 years, 8 months
JBoss hornetq SVN: r10477 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-11 16:52:34 -0400 (Mon, 11 Apr 2011)
New Revision: 10477
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
Log:
just javadoc, no code changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ping.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ping.java 2011-04-11 17:24:31 UTC (rev 10476)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/Ping.java 2011-04-11 20:52:34 UTC (rev 10477)
@@ -14,12 +14,18 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
*
- * A Ping
+ * Ping is sent on the client side at {@link ClientSessionFactoryImpl}
+ * At the server's side is treated at {@link RemotingServiceImpl}
*
+ * @See {@link RemotingConnection#checkDataReceived()}
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
13 years, 8 months
JBoss hornetq SVN: r10476 - in trunk/tests: integration-tests/src and 12 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-04-11 13:24:31 -0400 (Mon, 11 Apr 2011)
New Revision: 10476
Added:
trunk/tests/integration-tests/src/main/
trunk/tests/integration-tests/src/main/java/
trunk/tests/integration-tests/src/main/java/org/
trunk/tests/integration-tests/src/main/java/org/hornetq/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/util/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/util/SpawnedVMSupport.java
trunk/tests/joram-tests/pom.xml
trunk/tests/joram-tests/src/test/
trunk/tests/joram-tests/src/test/java/
trunk/tests/joram-tests/src/test/java/org/
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/util/SpawnedVMSupport.java
trunk/tests/joram-tests/src/org/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/joram-tests/hornetq-joram-tests.iml
trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/JMSTestCase.java
trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/TestConfig.java
trunk/tests/pom.xml
Log:
mavenised joram tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/hornetq-tests.iml 2011-04-11 17:24:31 UTC (rev 10476)
@@ -7,6 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/integration-tests/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/integration-tests/src/main/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Copied: trunk/tests/integration-tests/src/main/java/org/hornetq/tests/util/SpawnedVMSupport.java (from rev 10475, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/util/SpawnedVMSupport.java)
===================================================================
--- trunk/tests/integration-tests/src/main/java/org/hornetq/tests/util/SpawnedVMSupport.java (rev 0)
+++ trunk/tests/integration-tests/src/main/java/org/hornetq/tests/util/SpawnedVMSupport.java 2011-04-11 17:24:31 UTC (rev 10476)
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.util;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic@redhat.com">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class SpawnedVMSupport
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SpawnedVMSupport.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static Process spawnVM(final String className, final String... args) throws Exception
+ {
+ return SpawnedVMSupport.spawnVM(className, new String[0], true, args);
+ }
+
+ public static Process spawnVM(final String className, final boolean logOutput, final String... args) throws Exception
+ {
+ return SpawnedVMSupport.spawnVM(className, new String[0], logOutput, args);
+ }
+
+ public static Process spawnVM(final String className, final String[] vmargs, final String... args) throws Exception
+ {
+ return SpawnedVMSupport.spawnVM(className, vmargs, true, args);
+ }
+
+ public static Process spawnVM(final String className,
+ final String[] vmargs,
+ final boolean logOutput,
+ final String... args) throws Exception
+ {
+ return SpawnedVMSupport.spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput, false, args);
+ }
+
+ public static Process spawnVM(final String className,
+ final String memoryArgs,
+ final String[] vmargs,
+ final boolean logOutput,
+ final boolean logErrorOutput,
+ final String... args) throws Exception
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("java").append(' ');
+
+ sb.append(memoryArgs);
+
+ for (String vmarg : vmargs)
+ {
+ sb.append(vmarg).append(' ');
+ }
+
+ String classPath = System.getProperty("java.class.path");
+
+ String osName = System.getProperty("os.name");
+ osName = (osName != null) ? osName.toLowerCase() : "";
+ boolean isWindows = osName.contains("win");
+ if (isWindows)
+ {
+ sb.append("-cp").append(" \"").append(classPath).append("\" ");
+ }
+ else
+ {
+ sb.append("-cp").append(" ").append(classPath).append(" ");
+ }
+
+ sb.append("-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir", "./tmp")).append(" ");
+
+ sb.append("-Djava.library.path=").append(System.getProperty("java.library.path", "./native/bin")).append(" ");
+
+ String loggingConfigFile = System.getProperty("java.util.logging.config.file");
+ if (loggingConfigFile != null)
+ {
+ sb.append(" -Djava.util.logging.config.file=" + loggingConfigFile + " ");
+ }
+ String loggingPlugin = System.getProperty("org.jboss.logging.Logger.pluginClass");
+ if (loggingPlugin != null)
+ {
+ sb.append(" -Dorg.jboss.logging.Logger.pluginClass=" + loggingPlugin + " ");
+ }
+
+ // sb.append("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 ");
+ sb.append(className).append(' ');
+
+ for (String arg : args)
+ {
+ sb.append(arg).append(' ');
+ }
+
+ String commandLine = sb.toString();
+
+ SpawnedVMSupport.log.trace("command line: " + commandLine);
+
+ Process process = Runtime.getRuntime().exec(commandLine);
+
+ SpawnedVMSupport.log.trace("process: " + process);
+
+ if (logOutput)
+ {
+ SpawnedVMSupport.startLogger(className, process);
+
+ }
+
+ // Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
+ // http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
+ ProcessLogger errorLogger = new ProcessLogger(logErrorOutput, process.getErrorStream(), className);
+ errorLogger.start();
+
+ return process;
+ }
+
+ /**
+ * @param className
+ * @param process
+ * @throws ClassNotFoundException
+ */
+ public static void startLogger(final String className, final Process process) throws ClassNotFoundException
+ {
+ ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className);
+ outputLogger.start();
+ }
+
+ /**
+ * Assert that a process exits with the expected value (or not depending if
+ * the <code>sameValue</code> is expected or not). The method waits 5
+ * seconds for the process to exit, then an Exception is thrown. In any case,
+ * the process is destroyed before the method returns.
+ */
+ public static void assertProcessExits(final boolean sameValue, final int value, final Process p) throws InterruptedException,
+ ExecutionException,
+ TimeoutException
+ {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ Future<Integer> future = executor.submit(new Callable<Integer>()
+ {
+
+ public Integer call() throws Exception
+ {
+ p.waitFor();
+ return p.exitValue();
+ }
+ });
+ try
+ {
+ int exitValue = future.get(10, SECONDS);
+ if (sameValue)
+ {
+ Assert.assertSame(value, exitValue);
+ }
+ else
+ {
+ Assert.assertNotSame(value, exitValue);
+ }
+ }
+ finally
+ {
+ p.destroy();
+ }
+ }
+
+ /**
+ * Redirect the input stream to a logger (as debug logs)
+ */
+ static class ProcessLogger extends Thread
+ {
+ private final InputStream is;
+
+ private final String className;
+
+ private final boolean print;
+
+ ProcessLogger(final boolean print, final InputStream is, final String className) throws ClassNotFoundException
+ {
+ this.is = is;
+ this.print = print;
+ this.className = className;
+ setDaemon(true);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line = null;
+ while ((line = br.readLine()) != null)
+ {
+ if (print)
+ {
+ System.out.println(className + ":" + line);
+ }
+ }
+ }
+ catch (IOException ioe)
+ {
+ ioe.printStackTrace();
+ }
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/util/SpawnedVMSupport.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/util/SpawnedVMSupport.java 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/util/SpawnedVMSupport.java 2011-04-11 17:24:31 UTC (rev 10476)
@@ -1,254 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.util;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-
-/**
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:csuconic@redhat.com">Clebert Suconic</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public class SpawnedVMSupport
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(SpawnedVMSupport.class);
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static Process spawnVM(final String className, final String... args) throws Exception
- {
- return SpawnedVMSupport.spawnVM(className, new String[0], true, args);
- }
-
- public static Process spawnVM(final String className, final boolean logOutput, final String... args) throws Exception
- {
- return SpawnedVMSupport.spawnVM(className, new String[0], logOutput, args);
- }
-
- public static Process spawnVM(final String className, final String[] vmargs, final String... args) throws Exception
- {
- return SpawnedVMSupport.spawnVM(className, vmargs, true, args);
- }
-
- public static Process spawnVM(final String className,
- final String[] vmargs,
- final boolean logOutput,
- final String... args) throws Exception
- {
- return SpawnedVMSupport.spawnVM(className, "-Xms512m -Xmx512m ", vmargs, logOutput, false, args);
- }
-
- public static Process spawnVM(final String className,
- final String memoryArgs,
- final String[] vmargs,
- final boolean logOutput,
- final boolean logErrorOutput,
- final String... args) throws Exception
- {
- StringBuffer sb = new StringBuffer();
-
- sb.append("java").append(' ');
-
- sb.append(memoryArgs);
-
- for (String vmarg : vmargs)
- {
- sb.append(vmarg).append(' ');
- }
-
- String classPath = System.getProperty("java.class.path");
-
- String osName = System.getProperty("os.name");
- osName = (osName != null) ? osName.toLowerCase() : "";
- boolean isWindows = osName.contains("win");
- if (isWindows)
- {
- sb.append("-cp").append(" \"").append(classPath).append("\" ");
- }
- else
- {
- sb.append("-cp").append(" ").append(classPath).append(" ");
- }
-
- sb.append("-Djava.io.tmpdir=" + System.getProperty("java.io.tmpdir", "./tmp")).append(" ");
-
- sb.append("-Djava.library.path=").append(System.getProperty("java.library.path", "./native/bin")).append(" ");
-
- String loggingConfigFile = System.getProperty("java.util.logging.config.file");
- if (loggingConfigFile != null)
- {
- sb.append(" -Djava.util.logging.config.file=" + loggingConfigFile + " ");
- }
- String loggingPlugin = System.getProperty("org.jboss.logging.Logger.pluginClass");
- if (loggingPlugin != null)
- {
- sb.append(" -Dorg.jboss.logging.Logger.pluginClass=" + loggingPlugin + " ");
- }
-
- // sb.append("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 ");
- sb.append(className).append(' ');
-
- for (String arg : args)
- {
- sb.append(arg).append(' ');
- }
-
- String commandLine = sb.toString();
-
- SpawnedVMSupport.log.trace("command line: " + commandLine);
-
- Process process = Runtime.getRuntime().exec(commandLine);
-
- SpawnedVMSupport.log.trace("process: " + process);
-
- if (logOutput)
- {
- SpawnedVMSupport.startLogger(className, process);
-
- }
-
- // Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
- // http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
- ProcessLogger errorLogger = new ProcessLogger(logErrorOutput, process.getErrorStream(), className);
- errorLogger.start();
-
- return process;
- }
-
- /**
- * @param className
- * @param process
- * @throws ClassNotFoundException
- */
- public static void startLogger(final String className, final Process process) throws ClassNotFoundException
- {
- ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className);
- outputLogger.start();
- }
-
- /**
- * Assert that a process exits with the expected value (or not depending if
- * the <code>sameValue</code> is expected or not). The method waits 5
- * seconds for the process to exit, then an Exception is thrown. In any case,
- * the process is destroyed before the method returns.
- */
- public static void assertProcessExits(final boolean sameValue, final int value, final Process p) throws InterruptedException,
- ExecutionException,
- TimeoutException
- {
- ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- Future<Integer> future = executor.submit(new Callable<Integer>()
- {
-
- public Integer call() throws Exception
- {
- p.waitFor();
- return p.exitValue();
- }
- });
- try
- {
- int exitValue = future.get(10, SECONDS);
- if (sameValue)
- {
- Assert.assertSame(value, exitValue);
- }
- else
- {
- Assert.assertNotSame(value, exitValue);
- }
- }
- finally
- {
- p.destroy();
- }
- }
-
- /**
- * Redirect the input stream to a logger (as debug logs)
- */
- static class ProcessLogger extends Thread
- {
- private final InputStream is;
-
- private final String className;
-
- private final boolean print;
-
- ProcessLogger(final boolean print, final InputStream is, final String className) throws ClassNotFoundException
- {
- this.is = is;
- this.print = print;
- this.className = className;
- setDaemon(true);
- }
-
- @Override
- public void run()
- {
- try
- {
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line = null;
- while ((line = br.readLine()) != null)
- {
- if (print)
- {
- System.out.println(className + ":" + line);
- }
- }
- }
- catch (IOException ioe)
- {
- ioe.printStackTrace();
- }
- }
- }
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/joram-tests/hornetq-joram-tests.iml
===================================================================
--- trunk/tests/joram-tests/hornetq-joram-tests.iml 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/joram-tests/hornetq-joram-tests.iml 2011-04-11 17:24:31 UTC (rev 10476)
@@ -3,7 +3,6 @@
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
- <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/output" />
</content>
<orderEntry type="inheritedJdk" />
Copied: trunk/tests/joram-tests/pom.xml (from rev 10474, trunk/tests/jms-tests/pom.xml)
===================================================================
--- trunk/tests/joram-tests/pom.xml (rev 0)
+++ trunk/tests/joram-tests/pom.xml 2011-04-11 17:24:31 UTC (rev 10476)
@@ -0,0 +1,134 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>joram-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ Unit Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified: trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/JMSTestCase.java
===================================================================
--- trunk/tests/joram-tests/src/org/objectweb/jtests/jms/framework/JMSTestCase.java 2011-04-08 20:22:53 UTC (rev 10473)
+++ trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/JMSTestCase.java 2011-04-11 17:24:31 UTC (rev 10476)
@@ -35,7 +35,7 @@
*/
public abstract class JMSTestCase extends TestCase
{
- private static final String PROP_FILE_NAME = "provider.properties";
+ private static final String PROP_FILE_NAME = "config/provider.properties";
protected Admin admin;
Modified: trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/TestConfig.java
===================================================================
--- trunk/tests/joram-tests/src/org/objectweb/jtests/jms/framework/TestConfig.java 2011-04-08 20:22:53 UTC (rev 10473)
+++ trunk/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/TestConfig.java 2011-04-11 17:24:31 UTC (rev 10476)
@@ -24,7 +24,7 @@
public class TestConfig
{
// name of the configuration file
- private static final String PROP_FILE_NAME = "test.properties";
+ private static final String PROP_FILE_NAME = "config/test.properties";
// name of the timeout property
private static final String PROP_NAME = "timeout";
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-11 12:47:21 UTC (rev 10475)
+++ trunk/tests/pom.xml 2011-04-11 17:24:31 UTC (rev 10476)
@@ -27,5 +27,6 @@
<module>unit-tests</module>
<module>integration-tests</module>
<module>jms-tests</module>
+ <module>joram-tests</module>
</modules>
</project>
13 years, 8 months
JBoss hornetq SVN: r10475 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-04-11 08:47:21 -0400 (Mon, 11 Apr 2011)
New Revision: 10475
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
Log:
changed receive timeout
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-04-11 11:16:05 UTC (rev 10474)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-04-11 12:47:21 UTC (rev 10475)
@@ -1,14 +1,14 @@
/*
* Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
package org.hornetq.tests.integration.clientcrash;
@@ -76,7 +76,7 @@
session.start();
// receive a message from the queue
- Message messageFromClient = consumer.receive(500000);
+ Message messageFromClient = consumer.receive(10000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
13 years, 8 months
JBoss hornetq SVN: r10473 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-08 16:22:53 -0400 (Fri, 08 Apr 2011)
New Revision: 10473
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
Log:
just an additional test. I wrote this test to validate an user's telling me about a bug that didn't really exist. So, why not commit it? There's never enough tests.
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2011-04-08 13:16:19 UTC (rev 10472)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2011-04-08 20:22:53 UTC (rev 10473)
@@ -17,12 +17,16 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.tests.util.RandomUtil;
@@ -264,6 +268,60 @@
}
+ public void testManyMessages() throws Exception
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = 0 ; i < 777; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.setPriority((byte)5);
+ msg.putBooleanProperty("fast", false);
+ producer.send(msg);
+ }
+
+ for (int i = 0 ; i < 333; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.setPriority((byte)6);
+ msg.putBooleanProperty("fast", true);
+ producer.send(msg);
+ }
+
+ ClientConsumer consumer = session.createConsumer(queue);
+
+ session.start();
+
+
+ for (int i = 0 ; i < 333; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertTrue(msg.getBooleanProperty("fast"));
+ }
+
+ for (int i = 0 ; i < 777; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertFalse(msg.getBooleanProperty("fast"));
+ }
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+
+ session.close();
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
13 years, 8 months
JBoss hornetq SVN: r10472 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-08 09:16:19 -0400 (Fri, 08 Apr 2011)
New Revision: 10472
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6237 - just moving a test to the public section
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-08 13:11:08 UTC (rev 10471)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-08 13:16:19 UTC (rev 10472)
@@ -2449,7 +2449,159 @@
}
}
}
+
+ public void testPageOnLargeMessageMultipleQueues() throws Exception
+ {
+ Configuration config = createDefaultConfig(isNetty());
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
+
+ AddressSettings value = new AddressSettings();
+ map.put(LargeMessageTest.ADDRESS.toString(), value);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writerIndex(0);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
+ }
+
+ producer.send(message);
+ }
+
+ ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
+
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ server.stop();
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+
+ Assert.assertNotNull(messageLarge);
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ messageLarge.acknowledge();
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
+ }
+
+ if (i < 4)
+ session.rollback();
+ else
+ session.commit();
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testSendStreamingSingleMessage() throws Exception
{
ClientSession session = null;
@@ -2813,158 +2965,6 @@
}
- public void testPageOnLargeMessageMultipleQueues() throws Exception
- {
- Configuration config = createDefaultConfig(isNetty());
-
- final int PAGE_MAX = 20 * 1024;
-
- final int PAGE_SIZE = 10 * 1024;
-
- HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
-
- AddressSettings value = new AddressSettings();
- map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
-
- final int numberOfBytes = 1024;
-
- final int numberOfBytesBigMessage = 400000;
-
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
-
- ClientSessionFactory sf = locator.createSessionFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
-
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
- ClientMessage message = null;
-
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
-
- message.getBodyBuffer().writerIndex(0);
-
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
- }
-
- ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
- clientFile.putBooleanProperty("TestLarge", true);
- producer.send(clientFile);
-
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
-
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- producer.send(message);
- }
-
- session.close();
-
- server.stop();
-
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
-
- sf = locator.createSessionFactory();
-
- for (int ad = 0; ad < 2; ad++)
- {
- session = sf.createSession(false, false, false);
-
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
-
- session.start();
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- Assert.assertNotNull(message2);
- }
-
- session.commit();
-
- for (int i = 0; i < 5; i++)
- {
- ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
-
- assertTrue(messageLarge.getBooleanProperty("TestLarge"));
-
- Assert.assertNotNull(messageLarge);
-
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- messageLarge.acknowledge();
- messageLarge.saveToOutputStream(bout);
- byte[] body = bout.toByteArray();
- assertEquals(numberOfBytesBigMessage, body.length);
- for (int bi = 0; bi < body.length; bi++)
- {
- assertEquals(getSamplebyte(bi), body[bi]);
- }
-
- if (i < 4)
- session.rollback();
- else
- session.commit();
- }
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- Assert.assertNotNull(message2);
- }
-
- session.commit();
-
- consumer.close();
-
- session.close();
-
- }
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
13 years, 8 months
JBoss hornetq SVN: r10471 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-08 09:11:08 -0400 (Fri, 08 Apr 2011)
New Revision: 10471
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
Log:
Adding test to validate coming back to live server when backup is not active yet
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/NettyAsynchronousReattachTest.java 2011-04-08 13:11:08 UTC (rev 10471)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+
+/**
+ * A NettyAsynchronousReattachTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class NettyAsynchronousReattachTest extends NettyAsynchronousFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+
+ protected void crash(final ClientSession... sessions) throws Exception
+ {
+ for (ClientSession session : sessions)
+ {
+ ClientSessionInternal internalSession = (ClientSessionInternal) session;
+ internalSession.getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+ }
+ }
+
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
13 years, 8 months