Author: clebert.suconic(a)jboss.com
Date: 2010-09-03 17:24:48 -0400 (Fri, 03 Sep 2010)
New Revision: 9643
Modified:
branches/Branch_2_1/.classpath
branches/Branch_2_1/examples/soak/tx-restarts/README
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
Log:
Tweaks on soak-test
Modified: branches/Branch_2_1/.classpath
===================================================================
--- branches/Branch_2_1/.classpath 2010-09-03 10:20:17 UTC (rev 9642)
+++ branches/Branch_2_1/.classpath 2010-09-03 21:24:48 UTC (rev 9643)
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/**/*" kind="src"
path="src/main"/>
+ <classpathentry kind="src"
path="examples/soak/tx-restarts/src"/>
<classpathentry kind="src" path="src/config/common"/>
<classpathentry kind="src" path="build/src"/>
<classpathentry kind="src" path="tests/jms-tests/config"/>
@@ -17,7 +18,7 @@
<classpathentry kind="src"
path="examples/core/microcontainer/src"/>
<classpathentry kind="src"
path="examples/core/embedded-remote/src"/>
<classpathentry kind="src" path="examples/core/perf/src"/>
- <classpathentry kind="src"
path="examples/core/twitter-connector/src"/>
+ <classpathentry kind="src"
path="examples/core/twitter-connector/src"/>
<classpathentry kind="src" path="examples/jms/applet/src"/>
<classpathentry kind="src"
path="examples/jms/application-layer-failover/src"/>
<classpathentry kind="src" path="examples/jms/bridge/src"/>
Modified: branches/Branch_2_1/examples/soak/tx-restarts/README
===================================================================
--- branches/Branch_2_1/examples/soak/tx-restarts/README 2010-09-03 10:20:17 UTC (rev
9642)
+++ branches/Branch_2_1/examples/soak/tx-restarts/README 2010-09-03 21:24:48 UTC (rev
9643)
@@ -17,4 +17,13 @@
You can start the server directly if you want, you can just start the server as:
+./run.sh PATH_TO_HORNETQ/examples/soak/tx-restarts/server0
+
+Then you can run the test as:
+
+./build.sh runRemote
+
+
+And you can now kill and restart the server manually as many times as you want.
+
Modified: branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml 2010-09-03
10:20:17 UTC (rev 9642)
+++
branches/Branch_2_1/examples/soak/tx-restarts/server0/hornetq-configuration.xml 2010-09-03
21:24:48 UTC (rev 9643)
@@ -2,6 +2,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+
+ <journal-file-size>102400</journal-file-size>
+
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-03
10:20:17 UTC (rev 9642)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-03
21:24:48 UTC (rev 9643)
@@ -15,11 +15,14 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
+import org.hornetq.utils.ReusableLatch;
+
/**
* A Receiver
*
@@ -36,16 +39,24 @@
private Queue queue;
- private final Semaphore sem = new Semaphore(0);
+ // We should leave some messages on paging. We don't want to consume all for this
test
+ private final Semaphore minConsume = new Semaphore(0);
- private final Semaphore max = new Semaphore(10000);
+ private final ReusableLatch latchMax = new ReusableLatch(0);
+ private static final int MAX_DIFF = 10000;
+
+ // The difference between producer and consuming
+ private final AtomicInteger currentDiff = new AtomicInteger(0);
+
private final String queueJNDI;
- protected volatile long msgs = 0;
+ protected long msgs = 0;
- protected volatile long pendingMsgs = 0;
+ protected int pendingMsgs = 0;
+ protected int pendingSemaphores = 0;
+
protected MessageConsumer cons;
@@ -73,11 +84,6 @@
for (int i = 0 ; i < 1000; i++)
{
- if (!sem.tryAcquire(1, 5, TimeUnit.SECONDS))
- {
- break;
- }
- max.release();
Message msg = cons.receive(5000);
if (msg == null)
{
@@ -91,6 +97,10 @@
}
pendingMsgs++;
+ if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
}
@@ -126,8 +136,10 @@
protected void onCommit()
{
msgs += pendingMsgs;
+ this.currentDiff.addAndGet(-pendingMsgs);
+ latchMax.countDown(pendingMsgs);
pendingMsgs = 0;
- System.out.println("Commit on consumer " + queueJNDI + ",
msgs=" + msgs);
+ System.out.println("Commit on consumer " + queueJNDI + ",
msgs=" + msgs + " currentDiff = " + currentDiff);
}
/* (non-Javadoc)
@@ -137,6 +149,7 @@
protected void onRollback()
{
System.out.println("Rollback on consumer " + queueJNDI + ",
msgs=" + msgs);
+ minConsume.release(pendingMsgs);
pendingMsgs = 0;
}
@@ -148,17 +161,24 @@
/**
* @param pendingMsgs2
*/
- public void messageProduced(int pendingMsgs2)
+ public void messageProduced(int producedMessages)
{
- sem.release(pendingMsgs2);
- try
+ minConsume.release(producedMessages);
+ currentDiff.addAndGet(producedMessages);
+ System.out.println("Msg produced on " + this.queueJNDI + ",
currentDiff = " + currentDiff);
+ if (currentDiff.get() > MAX_DIFF)
{
- max.tryAcquire(pendingMsgs2, 5, TimeUnit.SECONDS);
+ System.out.println("Holding producer for 5 seconds");
+ latchMax.setCount(currentDiff.get() - MAX_DIFF);
+ try
+ {
+ latchMax.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
}
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-03
10:20:17 UTC (rev 9642)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-03
21:24:48 UTC (rev 9643)
@@ -113,10 +113,10 @@
rec1.start();
rec2.start();
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
if (runServer)
{
- long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
while (timeEnd > System.currentTimeMillis())
{
System.out.println("Letting the service run for 20 seconds");
@@ -148,10 +148,14 @@
}
else
{
- long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
while (timeEnd > System.currentTimeMillis())
{
-
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 ||
rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the
clients, please look at the logs");
+ break;
+ }
+ Thread.sleep(10000);
}
}
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-09-03
10:20:17 UTC (rev 9642)
+++
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-09-03
21:24:48 UTC (rev 9643)
@@ -268,8 +268,7 @@
{
if (file.getFile().size() != fileSize)
{
- JournalFilesRepository.log.warn("Deleting " + file + ".. as it
doesn't have the configured size",
- new Exception("trace"));
+ JournalFilesRepository.log.warn("Deleting " + file + ".. as it
doesn't have the configured size");
file.getFile().delete();
}
else
Modified: branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-03 10:20:17
UTC (rev 9642)
+++ branches/Branch_2_1/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-03 21:24:48
UTC (rev 9643)
@@ -87,6 +87,11 @@
}
int newState = actualState - numberOfReleases;
+
+ if (newState < 0)
+ {
+ newState = 0;
+ }
if (compareAndSetState(actualState, newState))
{
@@ -128,6 +133,12 @@
control.releaseShared(1);
}
+
+ public void countDown(final int count)
+ {
+ control.releaseShared(count);
+ }
+
public void await() throws InterruptedException
{
control.acquireSharedInterruptibly(1);
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
===================================================================
---
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-03
10:20:17 UTC (rev 9642)
+++
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-03
21:24:48 UTC (rev 9643)
@@ -30,6 +30,19 @@
{
private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
+
+ public void testLatchWithParameterizedDown() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch(1000);
+
+ latch.countDown(5000);
+
+ assertTrue(latch.await(1000));
+
+
+ assertEquals(0, latch.getCount());
+ }
+
public void testLatchOnSingleThread() throws Exception
{
ReusableLatch latch = new ReusableLatch();