Author: ataylor
Date: 2011-04-12 06:12:15 -0400 (Tue, 12 Apr 2011)
New Revision: 10480
Added:
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java
trunk/tests/stress-tests/
trunk/tests/stress-tests/pom.xml
trunk/tests/stress-tests/src/
trunk/tests/stress-tests/src/test/
trunk/tests/stress-tests/src/test/java/
trunk/tests/stress-tests/src/test/java/org/
trunk/tests/stress-tests/src/test/java/org/hornetq/
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/stress/
Modified:
trunk/tests/hornetq-tests.iml
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
trunk/tests/pom.xml
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java
Log:
mavenised stress tests
Modified: trunk/tests/hornetq-tests.iml
===================================================================
--- trunk/tests/hornetq-tests.iml 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/hornetq-tests.iml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -9,6 +9,7 @@
<sourceFolder url="file://$MODULE_DIR$/unit-tests/src/main/java"
isTestSource="true" />
<sourceFolder
url="file://$MODULE_DIR$/integration-tests/src/main/java"
isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/soak-tests/src/test/java"
isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/stress-tests/src/test/java"
isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/jms-tests" />
<excludeFolder url="file://$MODULE_DIR$/joram-tests" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
Copied:
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java
(from rev 10475,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java)
===================================================================
---
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java
(rev 0)
+++
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -0,0 +1,1428 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.cluster.reattach;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+
+/**
+ * A MultiThreadRandomReattachTestBase
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public abstract class MultiThreadRandomReattachTestBase extends
MultiThreadReattachSupport
+{
+ private final Logger log = Logger.getLogger(getClass());
+
+ // Constants -----------------------------------------------------
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private final int LATCH_WAIT = getLatchWait();
+
+ private final int NUM_THREADS = getNumThreads();
+
+ // Attributes ----------------------------------------------------
+ protected static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+
+ protected HornetQServer liveServer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testA() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestA(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+
+ }
+
+ public void testB() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestB(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testC() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestC(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testD() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestD(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testE() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestE(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testF() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestF(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testG() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestG(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testH() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestH(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testI() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestI(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testJ() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestJ(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testK() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestK(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ public void testL() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestL(sf);
+ }
+ }, NUM_THREADS, true, 10);
+ }
+
+ // public void testM() throws Exception
+ // {
+ // runTestMultipleThreads(new RunnableT()
+ // {
+ // public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ // {
+ // doTestM(sf, threadNum);
+ // }
+ // }, NUM_THREADS);
+ // }
+
+ public void testN() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestN(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Added do replicate HORNETQ-264
+ public void testO() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ doTestO(sf, threadNum);
+ }
+ }, NUM_THREADS, false);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected abstract void start() throws Exception;
+
+ protected abstract void setBody(ClientMessage message) throws Exception;
+
+ protected abstract boolean checkSize(ClientMessage message);
+
+ protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws
Exception
+ {
+ return sf.createSession(false, true, true);
+ }
+
+ protected ClientSession createTransactionalSession(final ClientSessionFactory sf)
throws Exception
+ {
+ return sf.createSession(false, false, false);
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum, final
ClientSession session2) throws Exception
+ {
+ SimpleString subName = new SimpleString("sub" + threadNum);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null,
false);
+
+ ClientProducer producer =
session.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(subName);
+
+ final int numMessages = 100;
+
+ sendMessages(session, producer, numMessages, threadNum);
+
+ session.start();
+
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ producer.close();
+
+ consumer.close();
+
+ session.deleteQueue(subName);
+
+ session.close();
+ }
+
+ protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createAutoCommitSession(sf);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createTransactionalSession(sf);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ handler.reset();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ handlers.clear();
+
+ // Set handlers to null
+ for (ClientConsumer consumer : consumers)
+ {
+ consumer.setMessageHandler(null);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed on rollback: " +
handler.failure);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.rollback();
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ consumeMessages(consumers, numMessages, threadNum);
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ consumer.close();
+ }
+
+ s.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ s.close();
+ }
+
+ /*
+ * This test tests failure during create connection
+ */
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 100;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws
Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
+ new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.assertNull(consumer.receiveImmediate());
+ }
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
+ protected int getLatchWait()
+ {
+ return 60000;
+ }
+
+ protected int getNumIterations()
+ {
+ return 2;
+ }
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ log.info("************ Starting test " + getName());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (liveServer != null && liveServer.isStarted())
+ {
+ liveServer.stop();
+ }
+
+ liveServer = null;
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection) throws
Exception
+ {
+ runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
+ }
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+
+ runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(),
failOnCreateConnection, failDelay);
+ }
+
+ /**
+ * @return
+ */
+ @Override
+ protected ServerLocator createLocator() throws Exception
+ {
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+ return locator;
+ }
+
+ @Override
+ protected void stop() throws Exception
+ {
+ liveServer.stop();
+
+ System.gc();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQBytesMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ setBody(message);
+ producer.send(message);
+ }
+ }
+
+ private void consumeMessages(final Set<ClientConsumer> consumers, final int
numMessages, final int threadNum) throws Exception
+ {
+ // We make sure the messages arrive in the order they were sent from a particular
producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new
HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(consumer, consumerCounts);
+ }
+
+ ClientMessage msg =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ int tn = (Integer)msg.getObjectProperty(new
SimpleString("threadnum"));
+ int cnt = (Integer)msg.getObjectProperty(new
SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + tn + ":
" + c + " got " + cnt);
+ }
+
+ c++;
+
+ // Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+ private class MyHandler implements MessageHandler
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ private final Map<Integer, Integer> counts = new HashMap<Integer,
Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ volatile boolean done;
+
+ synchronized void reset()
+ {
+ counts.clear();
+
+ done = false;
+
+ failure = null;
+
+ latch = new CountDownLatch(1);
+ }
+
+ MyHandler(final int threadNum, final int numMessages)
+ {
+ tn = threadNum;
+
+ this.numMessages = numMessages;
+ }
+
+ public synchronized void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getObjectProperty(new
SimpleString("threadnum"));
+ int cnt = (Integer)message.getObjectProperty(new
SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + threadNum + ":" +
c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (!checkSize(message))
+ {
+ failure = "Invalid size on message";
+ log.error(failure);
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ latch.countDown();
+ }
+
+ c++;
+ // Wrap around at numMessages
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+ }
+ }
+}
\ No newline at end of file
Copied:
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java
(from rev 10475,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java)
===================================================================
---
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java
(rev 0)
+++
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.cluster.reattach;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A MultiThreadFailoverSupport
+ *
+ * @author <a href="mailto:time.fox@jboss.org">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created Mar 17, 2009 11:15:02 AM
+ *
+ *
+ */
+public abstract class MultiThreadReattachSupport extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Attributes ----------------------------------------------------
+
+ private Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
+ protected abstract ServerLocator createLocator() throws Exception;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ timer = new Timer();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+ timer = null;
+ super.tearDown();
+ }
+
+ protected boolean shouldFail()
+ {
+ return true;
+ }
+
+ protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
+ final int numThreads,
+ final int numIts,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+ for (int its = 0; its < numIts; its++)
+ {
+ log.info("Beginning iteration " + its);
+
+ start();
+
+ final ServerLocator locator = createLocator();
+
+ final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ Failer failer = startFailer(failDelay, session, failOnCreateConnection);
+
+ class Runner extends Thread
+ {
+ private volatile Throwable throwable;
+
+ private final RunnableT test;
+
+ private final int threadNum;
+
+ Runner(final RunnableT test, final int threadNum)
+ {
+ this.test = test;
+
+ this.threadNum = threadNum;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ test.run(sf, threadNum);
+ }
+ catch (Throwable t)
+ {
+ throwable = t;
+
+ log.error("Failed to run test", t);
+
+ // Case a failure happened here, it should print the Thread dump
+ // Sending it to System.out, as it would show on the Tests report
+ System.out.println(UnitTestCase.threadDump(" - fired by
MultiThreadRandomReattachTestBase::runTestMultipleThreads (" +
t.getLocalizedMessage() +
+ ")"));
+ }
+ }
+ }
+
+ do
+ {
+ List<Runner> threads = new ArrayList<Runner>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ Runner runner = new Runner(runnable, i);
+
+ threads.add(runner);
+
+ runner.start();
+ }
+
+ for (Runner thread : threads)
+ {
+ thread.join();
+
+ if (thread.throwable != null)
+ {
+ throw new Exception("Exception on thread " + thread,
thread.throwable);
+ }
+ }
+
+ runnable.checkFail();
+
+ }
+ while (!failer.isExecuted());
+
+ InVMConnector.resetFailures();
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ sf.close();
+
+ stop();
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session, final boolean
failOnCreateConnection)
+ {
+ Failer failer = new Failer(session, failOnCreateConnection);
+
+ // This is useful for debugging.. just change shouldFail to return false, and
Failer will not be executed
+ if (shouldFail())
+ {
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+ }
+
+ return failer;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ protected abstract class RunnableT extends Thread
+ {
+ private volatile String failReason;
+
+ private volatile Throwable throwable;
+
+ public void setFailed(final String reason, final Throwable throwable)
+ {
+ failReason = reason;
+ this.throwable = throwable;
+ }
+
+ public void checkFail()
+ {
+ if (throwable != null)
+ {
+ log.error("Test failed: " + failReason, throwable);
+ }
+ if (failReason != null)
+ {
+ Assert.fail(failReason);
+ }
+ }
+
+ public abstract void run(final ClientSessionFactory sf, final int threadNum) throws
Exception;
+ }
+
+ private class Failer extends TimerTask
+ {
+ private final ClientSession session;
+
+ private boolean executed;
+
+ private final boolean failOnCreateConnection;
+
+ public Failer(final ClientSession session, final boolean failOnCreateConnection)
+ {
+ this.session = session;
+
+ this.failOnCreateConnection = failOnCreateConnection;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ log.info("** Failing connection");
+
+ RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
+
+ if (failOnCreateConnection)
+ {
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ }
+ else
+ {
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED,
"blah"));
+ }
+
+ log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+}
Copied:
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java
(from rev 10475,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java)
===================================================================
---
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java
(rev 0)
+++
trunk/tests/integration-tests/src/main/java/org/hornetq/tests/largemessage/LargeMessageTestBase.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -0,0 +1,702 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.largemessage;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public abstract class LargeMessageTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+ protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ protected HornetQServer server;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ LargeMessageTestBase.log.warn(e.getMessage(), e);
+ }
+ }
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
+ {
+ testChunks(isXA,
+ restartOnXA,
+ rollbackFirstSend,
+ useStreamOnConsume,
+ realFiles,
+ preAck,
+ sendingBlocking,
+ testBrowser,
+ useMessageConsumer,
+ numberOfMessages,
+ numberOfBytes,
+ waitOnConsumer,
+ delayDelivery,
+ -1,
+ 10 * 1024);
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean restartOnXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final int producerWindow,
+ final int minSize) throws Exception
+ {
+ clearData();
+
+ server = createServer(realFiles);
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ try
+ {
+
+ if (sendingBlocking)
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ }
+
+ if (producerWindow > 0)
+ {
+ locator.setConfirmationWindowSize(producerWindow);
+ }
+
+ locator.setMinLargeMessageSize(minSize);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session;
+
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ if (rollbackFirstSend)
+ {
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ session.rollback(xid);
+ producer = session.createProducer(ADDRESS);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles && restartOnXA)
+ {
+ server.stop();
+ server.start();
+ //we need to recreate sf's
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+ Assert.assertEquals(1, xids.length);
+ Assert.assertEquals(xid, xids[0]);
+
+ producer = session.createProducer(ADDRESS);
+
+ session.commit(xid, false);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(realFiles);
+ server.start();
+
+ sf = locator.createSessionFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientConsumer consumer = null;
+
+ for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+ {
+ session.stop();
+
+ // first time with a browser
+ consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+ if (useMessageConsumer)
+ {
+ final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ int msgCounter;
+
+ public void onMessage(final ClientMessage message)
+ {
+ try
+ {
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
+ System.currentTimeMillis() - originalTime
>= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on
multiple scheduledMessages with
+ // the same
+ // scheduled delivery time
+ Assert.assertEquals(msgCounter,
+ ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] ==
UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ LargeMessageTestBase.log.debug("Read position
" + bytesRead.get() + " on consumer");
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid
packet at position " + bytesRead.get());
+ }
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (b == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as
expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " + b +
" bytes");
+ }
+
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
+ }
+
+ try
+ {
+ buffer.readByte();
+ Assert.fail("Supposed to throw an exception");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ LargeMessageTestBase.log.warn("Got an error", e);
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ latchDone.countDown();
+ msgCounter++;
+ }
+ }
+ };
+
+ session.start();
+
+ consumer.setMessageHandler(handler);
+
+ Assert.assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+ }
+ else
+ {
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ System.currentTimeMillis();
+
+ ClientMessage message = consumer.receive(waitOnConsumer +
delayDelivery);
+
+ Assert.assertNotNull(message);
+
+ System.currentTimeMillis();
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
+ Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >=
delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ Assert.assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple
scheduledMessages with the same
+ // scheduled delivery time
+ Assert.assertEquals(i,
+ ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ @Override
+ public void write(final byte b[]) throws IOException
+ {
+ if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("Received invalid packet
at position " + bytesRead.get());
+ }
+
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (bytesRead.get() % (1024l * 1024l) == 0)
+ {
+ LargeMessageTestBase.log.debug("Read " +
bytesRead.get() + " bytes");
+ }
+ if (b == (byte)'a')
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ LargeMessageTestBase.log.warn("byte not as
expected!");
+ }
+ }
+ });
+
+ Assert.assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+ buffer.resetReaderIndex();
+
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0l)
+ {
+ LargeMessageTestBase.log.debug("Read " + b + "
bytes");
+ }
+ Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
+ }
+ }
+
+ }
+
+ }
+ consumer.close();
+
+ if (iteration == 0)
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ else
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+
+ session.close();
+
+ Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ locator.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws HornetQException
+ */
+ private void sendMessages(final int numberOfMessages,
+ final long numberOfBytes,
+ final long delayDelivery,
+ final ClientSession session,
+ final ClientProducer producer) throws Exception
+ {
+ LargeMessageTestBase.log.debug("NumberOfBytes = " + numberOfBytes);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ // If the test is using more than 1M, we will only use the Streaming, as it
require too much memory from the
+ // test
+ if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+ {
+ LargeMessageTestBase.log.debug("Sending message (stream)" + i);
+
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+ }
+ else
+ {
+ LargeMessageTestBase.log.debug("Sending message (array)" + i);
+ byte[] bytes = new byte[(int)numberOfBytes];
+ for (int j = 0; j < bytes.length; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(j);
+ }
+ message.getBodyBuffer().writeBytes(bytes);
+ }
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time +
delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ }
+ }
+
+ protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
+ {
+ HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT *
numberOfIntegers);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ body.writeInt(i);
+ }
+
+ return body;
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session, final
int numberOfBytes) throws Exception
+ {
+ return createLargeClientMessage(session, numberOfBytes, true);
+ }
+
+ protected ClientMessage createLargeClientMessage (final ClientSession session, final
byte[] buffer, final boolean durable) throws Exception
+ {
+ ClientMessage msgs = session.createMessage(durable);
+ msgs.getBodyBuffer().writeBytes(buffer);
+ return msgs;
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws
Exception
+ {
+
+ ClientMessage clientMessage = session.createMessage(persistent);
+
+
clientMessage.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws HornetQException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected void readMessage(final ClientSession session, final SimpleString
queueToRead, final int numberOfBytes) throws HornetQException,
+
FileNotFoundException,
+
IOException
+ {
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueToRead);
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ Assert.assertNotNull(clientMessage);
+
+ clientMessage.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+ }
+
+ protected OutputStream createFakeOutputStream() throws Exception
+ {
+
+ return new OutputStream()
+ {
+ private boolean closed = false;
+
+ private int count;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (count++ % 1024 * 1024 == 0)
+ {
+ LargeMessageTestBase.log.debug("OutputStream received " + count
+ " bytes");
+ }
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ }
+
+ };
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-12
09:47:15 UTC (rev 10479)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -31,7 +31,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.largemessage.LargeMessageTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-12
09:47:15 UTC (rev 10479)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -16,9 +16,9 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.cluster.reattach.MultiThreadRandomReattachTestBase;
/**
*
Deleted:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12
09:47:15 UTC (rev 10479)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -1,1428 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.reattach;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.MessageHandler;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQBytesMessage;
-import org.hornetq.jms.client.HornetQTextMessage;
-
-/**
- * A MultiThreadRandomReattachTestBase
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- *
- */
-public abstract class MultiThreadRandomReattachTestBase extends
MultiThreadReattachSupport
-{
- private final Logger log = Logger.getLogger(getClass());
-
- // Constants -----------------------------------------------------
-
- private static final int RECEIVE_TIMEOUT = 30000;
-
- private final int LATCH_WAIT = getLatchWait();
-
- private final int NUM_THREADS = getNumThreads();
-
- // Attributes ----------------------------------------------------
- protected static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
-
- protected HornetQServer liveServer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testA() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestA(sf, threadNum);
- }
- }, NUM_THREADS, false);
-
- }
-
- public void testB() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestB(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testC() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestC(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testD() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestD(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testE() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestE(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testF() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestF(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testG() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestG(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testH() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestH(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testI() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestI(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testJ() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestJ(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testK() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestK(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- public void testL() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestL(sf);
- }
- }, NUM_THREADS, true, 10);
- }
-
- // public void testM() throws Exception
- // {
- // runTestMultipleThreads(new RunnableT()
- // {
- // public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- // {
- // doTestM(sf, threadNum);
- // }
- // }, NUM_THREADS);
- // }
-
- public void testN() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestN(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- // Added do replicate HORNETQ-264
- public void testO() throws Exception
- {
- runTestMultipleThreads(new RunnableT()
- {
- @Override
- public void run(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- doTestO(sf, threadNum);
- }
- }, NUM_THREADS, false);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected abstract void start() throws Exception;
-
- protected abstract void setBody(ClientMessage message) throws Exception;
-
- protected abstract boolean checkSize(ClientMessage message);
-
- protected ClientSession createAutoCommitSession(final ClientSessionFactory sf) throws
Exception
- {
- return sf.createSession(false, true, true);
- }
-
- protected ClientSession createTransactionalSession(final ClientSessionFactory sf)
throws Exception
- {
- return sf.createSession(false, false, false);
- }
-
- protected void doTestA(final ClientSessionFactory sf, final int threadNum, final
ClientSession session2) throws Exception
- {
- SimpleString subName = new SimpleString("sub" + threadNum);
-
- ClientSession session = sf.createSession(false, true, true);
-
- session.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName, null,
false);
-
- ClientProducer producer =
session.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(subName);
-
- final int numMessages = 100;
-
- sendMessages(session, producer, numMessages, threadNum);
-
- session.start();
-
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
-
- producer.close();
-
- consumer.close();
-
- session.deleteQueue(subName);
-
- session.close();
- }
-
- protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createAutoCommitSession(sf);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
-
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- sessSend.close();
-
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createAutoCommitSession(sf);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- sessSend.close();
-
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
-
- }
-
- protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = createTransactionalSession(sf);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
-
- handler.reset();
- }
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- Set<MyHandler> handlers = new HashSet<MyHandler>();
-
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed: " + handler.failure);
- }
- }
-
- handlers.clear();
-
- // Set handlers to null
- for (ClientConsumer consumer : consumers)
- {
- consumer.setMessageHandler(null);
- }
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- // New handlers
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler(threadNum, numMessages);
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
- for (MyHandler handler : handlers)
- {
- boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
-
- if (!ok)
- {
- throw new Exception("Timed out waiting for messages on handler " +
System.identityHashCode(handler) +
- " threadnum " +
- threadNum);
- }
-
- if (handler.failure != null)
- {
- throw new Exception("Handler failed on rollback: " +
handler.failure);
- }
- }
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + " sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- // Now with synchronous receive()
-
- protected void doTestE(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, true, true);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- consumeMessages(consumers, numMessages, threadNum);
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestF(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, true, true);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, true, true);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.start();
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- long start = System.currentTimeMillis();
-
- ClientSession s = sf.createSession(false, false, false);
-
- final int numMessages = 100;
-
- final int numSessions = 10;
-
- Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
- Set<ClientSession> sessions = new HashSet<ClientSession>();
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- ClientSession sessConsume = sf.createSession(false, false, false);
-
- sessConsume.createQueue(MultiThreadRandomReattachTestBase.ADDRESS, subName,
null, false);
-
- ClientConsumer consumer = sessConsume.createConsumer(subName);
-
- consumers.add(consumer);
-
- sessions.add(sessConsume);
- }
-
- ClientSession sessSend = sf.createSession(false, false, false);
-
- ClientProducer producer =
sessSend.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.rollback();
-
- sendMessages(sessSend, producer, numMessages, threadNum);
-
- sessSend.commit();
-
- for (ClientSession session : sessions)
- {
- session.start();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.rollback();
- }
-
- consumeMessages(consumers, numMessages, threadNum);
-
- for (ClientSession session : sessions)
- {
- session.commit();
- }
-
- sessSend.close();
- for (ClientSession session : sessions)
- {
- session.close();
- }
-
- for (int i = 0; i < numSessions; i++)
- {
- SimpleString subName = new SimpleString(threadNum + "sub" + i);
-
- s.deleteQueue(subName);
- }
-
- s.close();
-
- long end = System.currentTimeMillis();
-
- log.info("duration " + (end - start));
- }
-
- protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestK(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- ClientSession s = sf.createSession(false, false, false);
-
- s.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- final int numConsumers = 100;
-
- for (int i = 0; i < numConsumers; i++)
- {
- ClientConsumer consumer = s.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- consumer.close();
- }
-
- s.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- s.close();
- }
-
- /*
- * This test tests failure during create connection
- */
- protected void doTestL(final ClientSessionFactory sf) throws Exception
- {
- final int numSessions = 100;
-
- for (int i = 0; i < numSessions; i++)
- {
- ClientSession session = sf.createSession(false, false, false);
-
- session.close();
- }
- }
-
- protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.stop();
-
- sess.start();
-
- sess.stop();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- ClientProducer producer =
sess.createProducer(MultiThreadRandomReattachTestBase.ADDRESS);
-
- ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE, false, 0,
System.currentTimeMillis(), (byte)1);
- producer.send(message);
-
- sess.start();
-
- ClientMessage message2 =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(message2);
-
- message2.acknowledge();
-
- sess.stop();
-
- sess.start();
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws
Exception
- {
- ClientSession sessCreate = sf.createSession(false, true, true);
-
- sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
- new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()),
- null,
- false);
-
- ClientSession sess = sf.createSession(false, true, true);
-
- sess.start();
-
- ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- for (int i = 0; i < 100; i++)
- {
- Assert.assertNull(consumer.receiveImmediate());
- }
-
- sess.close();
-
- sessCreate.deleteQueue(new SimpleString(threadNum +
MultiThreadRandomReattachTestBase.ADDRESS.toString()));
-
- sessCreate.close();
- }
-
- protected int getLatchWait()
- {
- return 60000;
- }
-
- protected int getNumIterations()
- {
- return 2;
- }
-
- protected int getNumThreads()
- {
- return 10;
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- log.info("************ Starting test " + getName());
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if (liveServer != null && liveServer.isStarted())
- {
- liveServer.stop();
- }
-
- liveServer = null;
-
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- private void runTestMultipleThreads(final RunnableT runnable,
- final int numThreads,
- final boolean failOnCreateConnection) throws
Exception
- {
- runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
- }
-
- private void runTestMultipleThreads(final RunnableT runnable,
- final int numThreads,
- final boolean failOnCreateConnection,
- final long failDelay) throws Exception
- {
-
- runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(),
failOnCreateConnection, failDelay);
- }
-
- /**
- * @return
- */
- @Override
- protected ServerLocator createLocator() throws Exception
- {
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
- locator.setReconnectAttempts(-1);
- locator.setConfirmationWindowSize(1024 * 1024);
- return locator;
- }
-
- @Override
- protected void stop() throws Exception
- {
- liveServer.stop();
-
- System.gc();
-
- Assert.assertEquals(0, InVMRegistry.instance.size());
- }
-
- private void sendMessages(final ClientSession sessSend,
- final ClientProducer producer,
- final int numMessages,
- final int threadNum) throws Exception
- {
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createMessage(HornetQBytesMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("threadnum"), threadNum);
- message.putIntProperty(new SimpleString("count"), i);
- setBody(message);
- producer.send(message);
- }
- }
-
- private void consumeMessages(final Set<ClientConsumer> consumers, final int
numMessages, final int threadNum) throws Exception
- {
- // We make sure the messages arrive in the order they were sent from a particular
producer
- Map<ClientConsumer, Map<Integer, Integer>> counts = new
HashMap<ClientConsumer, Map<Integer, Integer>>();
-
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- Map<Integer, Integer> consumerCounts = counts.get(consumer);
-
- if (consumerCounts == null)
- {
- consumerCounts = new HashMap<Integer, Integer>();
- counts.put(consumer, consumerCounts);
- }
-
- ClientMessage msg =
consumer.receive(MultiThreadRandomReattachTestBase.RECEIVE_TIMEOUT);
-
- Assert.assertNotNull(msg);
-
- int tn = (Integer)msg.getObjectProperty(new
SimpleString("threadnum"));
- int cnt = (Integer)msg.getObjectProperty(new
SimpleString("count"));
-
- Integer c = consumerCounts.get(tn);
- if (c == null)
- {
- c = new Integer(cnt);
- }
-
- if (tn == threadNum && cnt != c.intValue())
- {
- throw new Exception("Invalid count, expected " + tn + ":
" + c + " got " + cnt);
- }
-
- c++;
-
- // Wrap
- if (c == numMessages)
- {
- c = 0;
- }
-
- consumerCounts.put(tn, c);
-
- msg.acknowledge();
- }
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private class MyHandler implements MessageHandler
- {
- CountDownLatch latch = new CountDownLatch(1);
-
- private final Map<Integer, Integer> counts = new HashMap<Integer,
Integer>();
-
- volatile String failure;
-
- final int tn;
-
- final int numMessages;
-
- volatile boolean done;
-
- synchronized void reset()
- {
- counts.clear();
-
- done = false;
-
- failure = null;
-
- latch = new CountDownLatch(1);
- }
-
- MyHandler(final int threadNum, final int numMessages)
- {
- tn = threadNum;
-
- this.numMessages = numMessages;
- }
-
- public synchronized void onMessage(final ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (HornetQException me)
- {
- log.error("Failed to process", me);
- }
-
- if (done)
- {
- return;
- }
-
- int threadNum = (Integer)message.getObjectProperty(new
SimpleString("threadnum"));
- int cnt = (Integer)message.getObjectProperty(new
SimpleString("count"));
-
- Integer c = counts.get(threadNum);
- if (c == null)
- {
- c = new Integer(cnt);
- }
-
- if (tn == threadNum && cnt != c.intValue())
- {
- failure = "Invalid count, expected " + threadNum + ":" +
c + " got " + cnt;
- log.error(failure);
-
- latch.countDown();
- }
-
- if (!checkSize(message))
- {
- failure = "Invalid size on message";
- log.error(failure);
- latch.countDown();
- }
-
- if (tn == threadNum && c == numMessages - 1)
- {
- done = true;
- latch.countDown();
- }
-
- c++;
- // Wrap around at numMessages
- if (c == numMessages)
- {
- c = 0;
- }
-
- counts.put(threadNum, c);
-
- }
- }
-}
\ No newline at end of file
Deleted:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12
09:47:15 UTC (rev 10479)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -1,281 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.cluster.reattach;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A MultiThreadFailoverSupport
- *
- * @author <a href="mailto:time.fox@jboss.org">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- * Created Mar 17, 2009 11:15:02 AM
- *
- *
- */
-public abstract class MultiThreadReattachSupport extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- private final Logger log = Logger.getLogger(this.getClass());
-
- // Attributes ----------------------------------------------------
-
- private Timer timer;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected abstract void start() throws Exception;
-
- protected abstract void stop() throws Exception;
-
- protected abstract ServerLocator createLocator() throws Exception;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- timer = new Timer();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- timer.cancel();
- timer = null;
- super.tearDown();
- }
-
- protected boolean shouldFail()
- {
- return true;
- }
-
- protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
- final int numThreads,
- final int numIts,
- final boolean failOnCreateConnection,
- final long failDelay) throws Exception
- {
- for (int its = 0; its < numIts; its++)
- {
- log.info("Beginning iteration " + its);
-
- start();
-
- final ServerLocator locator = createLocator();
-
- final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
locator.createSessionFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- Failer failer = startFailer(failDelay, session, failOnCreateConnection);
-
- class Runner extends Thread
- {
- private volatile Throwable throwable;
-
- private final RunnableT test;
-
- private final int threadNum;
-
- Runner(final RunnableT test, final int threadNum)
- {
- this.test = test;
-
- this.threadNum = threadNum;
- }
-
- @Override
- public void run()
- {
- try
- {
- test.run(sf, threadNum);
- }
- catch (Throwable t)
- {
- throwable = t;
-
- log.error("Failed to run test", t);
-
- // Case a failure happened here, it should print the Thread dump
- // Sending it to System.out, as it would show on the Tests report
- System.out.println(UnitTestCase.threadDump(" - fired by
MultiThreadRandomReattachTestBase::runTestMultipleThreads (" +
t.getLocalizedMessage() +
- ")"));
- }
- }
- }
-
- do
- {
- List<Runner> threads = new ArrayList<Runner>();
-
- for (int i = 0; i < numThreads; i++)
- {
- Runner runner = new Runner(runnable, i);
-
- threads.add(runner);
-
- runner.start();
- }
-
- for (Runner thread : threads)
- {
- thread.join();
-
- if (thread.throwable != null)
- {
- throw new Exception("Exception on thread " + thread,
thread.throwable);
- }
- }
-
- runnable.checkFail();
-
- }
- while (!failer.isExecuted());
-
- InVMConnector.resetFailures();
-
- session.close();
-
- locator.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
-
- sf.close();
-
- stop();
- }
- }
-
- // Private -------------------------------------------------------
-
- private Failer startFailer(final long time, final ClientSession session, final boolean
failOnCreateConnection)
- {
- Failer failer = new Failer(session, failOnCreateConnection);
-
- // This is useful for debugging.. just change shouldFail to return false, and
Failer will not be executed
- if (shouldFail())
- {
- timer.schedule(failer, (long)(time * Math.random()), 100);
- }
-
- return failer;
- }
-
- // Inner classes -------------------------------------------------
-
- protected abstract class RunnableT extends Thread
- {
- private volatile String failReason;
-
- private volatile Throwable throwable;
-
- public void setFailed(final String reason, final Throwable throwable)
- {
- failReason = reason;
- this.throwable = throwable;
- }
-
- public void checkFail()
- {
- if (throwable != null)
- {
- log.error("Test failed: " + failReason, throwable);
- }
- if (failReason != null)
- {
- Assert.fail(failReason);
- }
- }
-
- public abstract void run(final ClientSessionFactory sf, final int threadNum) throws
Exception;
- }
-
- private class Failer extends TimerTask
- {
- private final ClientSession session;
-
- private boolean executed;
-
- private final boolean failOnCreateConnection;
-
- public Failer(final ClientSession session, final boolean failOnCreateConnection)
- {
- this.session = session;
-
- this.failOnCreateConnection = failOnCreateConnection;
- }
-
- @Override
- public synchronized void run()
- {
- log.info("** Failing connection");
-
- RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
-
- if (failOnCreateConnection)
- {
- InVMConnector.numberOfFailures = 1;
- InVMConnector.failOnCreateConnection = true;
- }
- else
- {
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED,
"blah"));
- }
-
- log.info("** Fail complete");
-
- cancel();
-
- executed = true;
- }
-
- public synchronized boolean isExecuted()
- {
- return executed;
- }
- }
-
-}
Deleted:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-04-12
09:47:15 UTC (rev 10479)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -1,702 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
-
-/**
- * A LargeMessageTestBase
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- * Created Oct 29, 2008 11:43:52 AM
- *
- *
- */
-public abstract class LargeMessageTestBase extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
-
- protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- protected HornetQServer server;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void tearDown() throws Exception
- {
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e)
- {
- LargeMessageTestBase.log.warn(e.getMessage(), e);
- }
- }
-
- server = null;
-
- super.tearDown();
- }
-
- protected void testChunks(final boolean isXA,
- final boolean restartOnXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery) throws Exception
- {
- testChunks(isXA,
- restartOnXA,
- rollbackFirstSend,
- useStreamOnConsume,
- realFiles,
- preAck,
- sendingBlocking,
- testBrowser,
- useMessageConsumer,
- numberOfMessages,
- numberOfBytes,
- waitOnConsumer,
- delayDelivery,
- -1,
- 10 * 1024);
- }
-
- protected void testChunks(final boolean isXA,
- final boolean restartOnXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery,
- final int producerWindow,
- final int minSize) throws Exception
- {
- clearData();
-
- server = createServer(realFiles);
- server.start();
-
- ServerLocator locator = createInVMNonHALocator();
- try
- {
-
- if (sendingBlocking)
- {
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- }
-
- if (producerWindow > 0)
- {
- locator.setConfirmationWindowSize(producerWindow);
- }
-
- locator.setMinLargeMessageSize(minSize);
-
- ClientSessionFactory sf = locator.createSessionFactory();
-
- ClientSession session;
-
- Xid xid = null;
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- if (rollbackFirstSend)
- {
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- session.close();
-
- if (realFiles && restartOnXA)
- {
- server.stop();
- server.start();
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
- Assert.assertEquals(1, xids.length);
- Assert.assertEquals(xid, xids[0]);
-
- session.rollback(xid);
- producer = session.createProducer(ADDRESS);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- validateNoFilesOnLargeDir();
- }
-
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- session.close();
-
- if (realFiles && restartOnXA)
- {
- server.stop();
- server.start();
- //we need to recreate sf's
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
- Assert.assertEquals(1, xids.length);
- Assert.assertEquals(xid, xids[0]);
-
- producer = session.createProducer(ADDRESS);
-
- session.commit(xid, false);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- session.close();
-
- if (realFiles)
- {
- server.stop();
-
- server = createServer(realFiles);
- server.start();
-
- sf = locator.createSessionFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientConsumer consumer = null;
-
- for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
- {
- session.stop();
-
- // first time with a browser
- consumer = session.createConsumer(ADDRESS, null, iteration == 0);
-
- if (useMessageConsumer)
- {
- final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
- final AtomicInteger errors = new AtomicInteger(0);
-
- MessageHandler handler = new MessageHandler()
- {
- int msgCounter;
-
- public void onMessage(final ClientMessage message)
- {
- try
- {
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
- Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
- System.currentTimeMillis() - originalTime
>= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- Assert.assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on
multiple scheduledMessages with
- // the same
- // scheduled delivery time
- Assert.assertEquals(msgCounter,
- ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- @Override
- public void write(final byte b[]) throws IOException
- {
- if (b[0] ==
UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- LargeMessageTestBase.log.debug("Read position
" + bytesRead.get() + " on consumer");
- }
- else
- {
- LargeMessageTestBase.log.warn("Received invalid
packet at position " + bytesRead.get());
- }
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (b == UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- LargeMessageTestBase.log.warn("byte not as
expected!");
- }
- }
- });
-
- Assert.assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
-
- HornetQBuffer buffer = message.getBodyBuffer();
- buffer.resetReaderIndex();
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0)
- {
- LargeMessageTestBase.log.debug("Read " + b +
" bytes");
- }
-
- Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
- }
-
- try
- {
- buffer.readByte();
- Assert.fail("Supposed to throw an exception");
- }
- catch (Exception e)
- {
- }
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- LargeMessageTestBase.log.warn("Got an error", e);
- errors.incrementAndGet();
- }
- finally
- {
- latchDone.countDown();
- msgCounter++;
- }
- }
- };
-
- session.start();
-
- consumer.setMessageHandler(handler);
-
- Assert.assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
- Assert.assertEquals(0, errors.get());
- }
- else
- {
-
- session.start();
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- System.currentTimeMillis();
-
- ClientMessage message = consumer.receive(waitOnConsumer +
delayDelivery);
-
- Assert.assertNotNull(message);
-
- System.currentTimeMillis();
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getObjectProperty(new
SimpleString("original-time"));
- Assert.assertTrue(System.currentTimeMillis() - originalTime +
"<" + delayDelivery,
- System.currentTimeMillis() - originalTime >=
delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- Assert.assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple
scheduledMessages with the same
- // scheduled delivery time
- Assert.assertEquals(i,
- ((Integer)message.getObjectProperty(new
SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- @Override
- public void write(final byte b[]) throws IOException
- {
- if (b[0] == UnitTestCase.getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- }
- else
- {
- LargeMessageTestBase.log.warn("Received invalid packet
at position " + bytesRead.get());
- }
-
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (bytesRead.get() % (1024l * 1024l) == 0)
- {
- LargeMessageTestBase.log.debug("Read " +
bytesRead.get() + " bytes");
- }
- if (b == (byte)'a')
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- LargeMessageTestBase.log.warn("byte not as
expected!");
- }
- }
- });
-
- Assert.assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
- HornetQBuffer buffer = message.getBodyBuffer();
- buffer.resetReaderIndex();
-
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0l)
- {
- LargeMessageTestBase.log.debug("Read " + b + "
bytes");
- }
- Assert.assertEquals(UnitTestCase.getSamplebyte(b),
buffer.readByte());
- }
- }
-
- }
-
- }
- consumer.close();
-
- if (iteration == 0)
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
- }
- else
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
- }
- }
-
- session.close();
-
- Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- Assert.assertEquals(0,
((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /**
- * @param useFile
- * @param numberOfMessages
- * @param numberOfIntegers
- * @param delayDelivery
- * @param testTime
- * @param session
- * @param producer
- * @throws FileNotFoundException
- * @throws IOException
- * @throws HornetQException
- */
- private void sendMessages(final int numberOfMessages,
- final long numberOfBytes,
- final long delayDelivery,
- final ClientSession session,
- final ClientProducer producer) throws Exception
- {
- LargeMessageTestBase.log.debug("NumberOfBytes = " + numberOfBytes);
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
-
- // If the test is using more than 1M, we will only use the Streaming, as it
require too much memory from the
- // test
- if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
- {
- LargeMessageTestBase.log.debug("Sending message (stream)" + i);
-
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
- }
- else
- {
- LargeMessageTestBase.log.debug("Sending message (array)" + i);
- byte[] bytes = new byte[(int)numberOfBytes];
- for (int j = 0; j < bytes.length; j++)
- {
- bytes[j] = UnitTestCase.getSamplebyte(j);
- }
- message.getBodyBuffer().writeBytes(bytes);
- }
- message.putIntProperty(new SimpleString("counter-message"), i);
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time +
delayDelivery);
-
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
- }
- }
-
- protected HornetQBuffer createLargeBuffer(final int numberOfIntegers)
- {
- HornetQBuffer body = HornetQBuffers.fixedBuffer(DataConstants.SIZE_INT *
numberOfIntegers);
-
- for (int i = 0; i < numberOfIntegers; i++)
- {
- body.writeInt(i);
- }
-
- return body;
-
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session, final
int numberOfBytes) throws Exception
- {
- return createLargeClientMessage(session, numberOfBytes, true);
- }
-
- protected ClientMessage createLargeClientMessage (final ClientSession session, final
byte[] buffer, final boolean durable) throws Exception
- {
- ClientMessage msgs = session.createMessage(durable);
- msgs.getBodyBuffer().writeBytes(buffer);
- return msgs;
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session,
- final long numberOfBytes,
- final boolean persistent) throws
Exception
- {
-
- ClientMessage clientMessage = session.createMessage(persistent);
-
-
clientMessage.setBodyInputStream(UnitTestCase.createFakeLargeStream(numberOfBytes));
-
- return clientMessage;
- }
-
- /**
- * @param session
- * @param queueToRead
- * @param numberOfIntegers
- * @throws HornetQException
- * @throws FileNotFoundException
- * @throws IOException
- */
- protected void readMessage(final ClientSession session, final SimpleString
queueToRead, final int numberOfBytes) throws HornetQException,
-
FileNotFoundException,
-
IOException
- {
- session.start();
-
- ClientConsumer consumer = session.createConsumer(queueToRead);
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- Assert.assertNotNull(clientMessage);
-
- clientMessage.acknowledge();
-
- session.commit();
-
- consumer.close();
- }
-
- protected OutputStream createFakeOutputStream() throws Exception
- {
-
- return new OutputStream()
- {
- private boolean closed = false;
-
- private int count;
-
- @Override
- public void close() throws IOException
- {
- super.close();
- closed = true;
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (count++ % 1024 * 1024 == 0)
- {
- LargeMessageTestBase.log.debug("OutputStream received " + count
+ " bytes");
- }
- if (closed)
- {
- throw new IOException("Stream was closed");
- }
- }
-
- };
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-04-12 09:47:15 UTC (rev 10479)
+++ trunk/tests/pom.xml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -29,5 +29,6 @@
<module>jms-tests</module>
<module>joram-tests</module>
<module>soak-tests</module>
+ <module>stress-tests</module>
</modules>
</project>
Copied: trunk/tests/stress-tests/pom.xml (from rev 10479, trunk/tests/soak-tests/pom.xml)
===================================================================
--- trunk/tests/stress-tests/pom.xml (rev 0)
+++ trunk/tests/stress-tests/pom.xml 2011-04-12 10:12:15 UTC (rev 10480)
@@ -0,0 +1,140 @@
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>hornetq-tests-pom</artifactId>
+ <version>2.2.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>stress-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>HornetQ stress Tests</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq.tests</groupId>
+ <artifactId>jms-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jca-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jboss-security-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.security</groupId>
+ <artifactId>jbosssx</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.naming</groupId>
+ <artifactId>jnpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossts-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>apache-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge-->
+ <dependency>
+ <groupId>org.jboss.integration</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jaspi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>config</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/chunk/LargeMessageStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -13,7 +13,7 @@
package org.hornetq.tests.stress.chunk;
-import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.largemessage.LargeMessageTestBase;
/**
* A MessageChunkSoakTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/client/SendStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -17,7 +17,7 @@
import org.hornetq.api.core.client.*;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A SendStressTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/MultiThreadRandomReattachStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -13,7 +13,12 @@
package org.hornetq.tests.stress.failover;
-import org.hornetq.tests.integration.cluster.reattach.MultiThreadRandomReattachTest;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.tests.cluster.reattach.MultiThreadRandomReattachTestBase;
/**
* A MultiThreadRandomFailoverStressTest
@@ -21,9 +26,41 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
*/
-public class MultiThreadRandomReattachStressTest extends MultiThreadRandomReattachTest
+public class MultiThreadRandomReattachStressTest extends
MultiThreadRandomReattachTestBase
{
+ private static final Logger log =
Logger.getLogger(MultiThreadRandomReattachStressTest.class);
+
@Override
+ protected void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveServer = HornetQServers.newHornetQServer(liveConf, false);
+ liveServer.start();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#setBody(org.hornetq.api.core.client.ClientMessage)
+ */
+ @Override
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+ // Give each msg a body
+ message.getBodyBuffer().writeBytes(new byte[250]);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.tests.integration.cluster.failover.MultiThreadRandomReattachTestBase#checkSize(org.hornetq.api.core.client.ClientMessage)
+ */
+ @Override
+ protected boolean checkSize(final ClientMessage message)
+ {
+ return message.getBodyBuffer().readableBytes() == 250;
+ }
+
+ @Override
protected int getNumIterations()
{
return 100;
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/failover/RandomReattachStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/failover/RandomReattachStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -13,8 +13,26 @@
package org.hornetq.tests.stress.failover;
-import org.hornetq.tests.integration.cluster.reattach.RandomReattachTest;
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* A RandomFailoverStressTest
*
@@ -24,24 +42,1530 @@
*
*
*/
-public class RandomReattachStressTest extends RandomReattachTest
+public class RandomReattachStressTest extends ServiceTestBase
{
+ private static final Logger log = Logger.getLogger(RandomReattachStressTest.class);
+
// Constants -----------------------------------------------------
+ private static final int RECEIVE_TIMEOUT = 10000;
+
// Attributes ----------------------------------------------------
+ private static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+
+ private HornetQServer liveService;
+
+ private Timer timer;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ public void testA() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestA(sf);
+ }
+ });
+ }
+
+ public void testB() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestB(sf);
+ }
+ });
+ }
+
+ public void testC() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestC(sf);
+ }
+ });
+ }
+
+ public void testD() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestD(sf);
+ }
+ });
+ }
+
+ public void testE() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestE(sf);
+ }
+ });
+ }
+
+ public void testF() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestF(sf);
+ }
+ });
+ }
+
+ public void testG() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestG(sf);
+ }
+ });
+ }
+
+ public void testH() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestH(sf);
+ }
+ });
+ }
+
+ public void testI() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestI(sf);
+ }
+ });
+ }
+
+ public void testJ() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestJ(sf);
+ }
+ });
+ }
+
+ public void testK() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestK(sf);
+ }
+ });
+ }
+
+ public void testL() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestL(sf);
+ }
+ });
+ }
+
+ public void testN() throws Exception
+ {
+ runTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf) throws Exception
+ {
+ doTestN(sf);
+ }
+ });
+ }
+
+ public void runTest(final RunnableT runnable) throws Exception
+ {
+ final int numIts = getNumIterations();
+
+ for (int its = 0; its < numIts; its++)
+ {
+ RandomReattachStressTest.log.info("####" + getName() + "
iteration #" + its);
+ start();
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)
locator.createSessionFactory();
+
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Failer failer = startFailer(1000, session);
+
+ do
+ {
+ runnable.run(sf);
+ }
+ while (!failer.isExecuted());
+
+ session.close();
+
+ locator.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+
+ stop();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected void doTestA(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException me)
+ {
+ RandomReattachStressTest.log.error("Failed to process", me);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue("Didn't receive all messages", ok);
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestB(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 50;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages");
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ handler.checkAssertions();
+
+ Assert.assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+
+ }
+
+ protected void doTestC(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 1;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, expected " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ class MyHandler extends AssertionCheckMessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile int count;
+
+ public void onMessageAssert(final ClientMessage message)
+ {
+ if (count == numMessages)
+ {
+ Assert.fail("Too many messages, " + count);
+ }
+
+ Assert.assertEquals(count, message.getObjectProperty(new
SimpleString("count")));
+
+ count++;
+
+ if (count == numMessages)
+ {
+ latch.countDown();
+ }
+ }
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ handlers.clear();
+
+ // New handlers
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+ Assert.assertTrue(ok);
+
+ handler.checkAssertions();
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ // Now with synchronous receive()
+
+ protected void doTestE(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestF(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ if (msg == null)
+ {
+ throw new IllegalStateException("Failed to receive message " +
i);
+ }
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ Assert.assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, false, false);
+
+ sessConsume.createQueue(RandomReattachStressTest.ADDRESS, subName, null,
false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, false, false);
+
+ ClientProducer producer =
sessSend.createProducer(RandomReattachStressTest.ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.rollback();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+
System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ producer.send(message);
+ }
+
+ sessSend.commit();
+
+ for (ClientSession session : sessions)
+ {
+ session.start();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.rollback();
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(i, msg.getObjectProperty(new
SimpleString("count")));
+
+ msg.acknowledge();
+ }
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+
+ Assert.assertNull(msg);
+ }
+ }
+
+ for (ClientSession session : sessions)
+ {
+ session.commit();
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString("sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ RandomReattachStressTest.log.info("duration " + (end - start));
+ }
+
+ protected void doTestI(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS,
RandomReattachStressTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestJ(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS,
RandomReattachStressTest.ADDRESS, null, false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ ClientMessage message2 =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.close();
+
+ sessCreate.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ sessCreate.close();
+ }
+
+ protected void doTestK(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false);
+
+ s.createQueue(RandomReattachStressTest.ADDRESS, RandomReattachStressTest.ADDRESS,
null, false);
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(RandomReattachStressTest.ADDRESS);
+
+ consumer.close();
+ }
+
+ s.deleteQueue(RandomReattachStressTest.ADDRESS);
+
+ s.close();
+ }
+
+ protected void doTestL(final ClientSessionFactory sf) throws Exception
+ {
+ final int numSessions = 10;
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.close();
+ }
+ }
+
+ protected void doTestN(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true);
+
+ sessCreate.createQueue(RandomReattachStressTest.ADDRESS,
+ new
SimpleString(RandomReattachStressTest.ADDRESS.toString()),
+ null,
+ false);
+
+ ClientSession sess = sf.createSession(false, true, true);
+
+ sess.stop();
+
+ sess.start();
+
+ sess.stop();
+
+ ClientConsumer consumer = sess.createConsumer(new
SimpleString(RandomReattachStressTest.ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(RandomReattachStressTest.ADDRESS);
+
+ ClientMessage message = sess.createMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 =
consumer.receive(RandomReattachStressTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new
SimpleString(RandomReattachStressTest.ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
@Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ timer = new Timer(true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+
+ InVMRegistry.instance.clear();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session)
+ {
+ Failer failer = new Failer((ClientSessionInternal)session);
+
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+
+ return failer;
+ }
+
+ private void start() throws Exception
+ {
+ Configuration liveConf = createDefaultConfig();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new
TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveService = HornetQServers.newHornetQServer(liveConf, false);
+ liveService.start();
+ }
+
+ private void stop() throws Exception
+ {
+ liveService.stop();
+
+ Assert.assertEquals(0, InVMRegistry.instance.size());
+
+ liveService = null;
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class Failer extends TimerTask
+ {
+ private final ClientSessionInternal session;
+
+ private boolean executed;
+
+ public Failer(final ClientSessionInternal session)
+ {
+ this.session = session;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ RandomReattachStressTest.log.info("** Failing connection");
+
+ session.getConnection().fail(new
HornetQException(HornetQException.NOT_CONNECTED, "oops"));
+
+ RandomReattachStressTest.log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ return executed;
+ }
+ }
+
+ public abstract class RunnableT
+ {
+ abstract void run(final ClientSessionFactory sf) throws Exception;
+ }
+
+ static abstract class AssertionCheckMessageHandler implements MessageHandler
+ {
+
+
+ public void checkAssertions()
+ {
+ for (AssertionFailedError e: errors)
+ {
+ // it will throw the first error
+ throw e;
+ }
+ }
+
+ private ArrayList<AssertionFailedError> errors = new
ArrayList<AssertionFailedError>();
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.client.MessageHandler#onMessage(org.hornetq.api.core.client.ClientMessage)
+ */
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ onMessageAssert(message);
+ }
+ catch (AssertionFailedError e)
+ {
+ e.printStackTrace(); // System.out -> junit reports
+ errors.add(e);
+ }
+ }
+
+ public abstract void onMessageAssert(ClientMessage message);
+
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
protected int getNumIterations()
{
return 100;
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -26,7 +26,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A CompactingTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -40,7 +40,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/JournalRestartStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -25,7 +25,7 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* Simulates the journal being updated, compacted cleared up,
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -24,7 +24,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A LargeJournalStressTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -25,7 +25,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A MultiThreadConsumerStressTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -35,7 +35,7 @@
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* A MultiThreadConsumerStressTest
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -50,7 +50,7 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.LinkedListIterator;
/**
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/paging/PageStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -28,7 +28,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* This is an integration-tests that will take some time to run.
Modified:
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-11
12:47:21 UTC (rev 10475)
+++
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/remote/PingStressTest.java 2011-04-12
10:12:15 UTC (rev 10480)
@@ -29,9 +29,8 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.integration.remoting.PingTest;
import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.unit.util.ServiceTestBase;
+import org.hornetq.tests.util.ServiceTestBase;
/**
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
@@ -42,7 +41,7 @@
{
// Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(PingTest.class);
+ private static final Logger log = Logger.getLogger(PingStressTest.class);
private static final long PING_INTERVAL = 500;