Author: clebert.suconic(a)jboss.com
Date: 2010-09-02 23:15:25 -0400 (Thu, 02 Sep 2010)
New Revision: 9636
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
Log:
tweak on soak test
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-03
02:07:52 UTC (rev 9635)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-03
03:15:25 UTC (rev 9636)
@@ -13,6 +13,9 @@
package org.hornetq.jms.example;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
@@ -33,6 +36,8 @@
private Queue queue;
+ private final Semaphore sem = new Semaphore(0);
+
private final String queueJNDI;
protected volatile long msgs = 0;
@@ -66,6 +71,10 @@
for (int i = 0 ; i < 1000; i++)
{
+ if (!sem.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
Message msg = cons.receive(5000);
if (msg == null)
{
@@ -133,6 +142,14 @@
return "Receiver::" + this.queueJNDI + ", msgs=" + msgs +
", pending=" + pendingMsgs;
}
+ /**
+ * @param pendingMsgs2
+ */
+ public void messageProduced(int pendingMsgs2)
+ {
+ sem.release(pendingMsgs2);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java 2010-09-03
02:07:52 UTC (rev 9635)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java 2010-09-03
03:15:25 UTC (rev 9636)
@@ -37,17 +37,20 @@
protected Queue queue;
protected long msgs = TXRestartSoak.MIN_MESSAGES_ON_QUEUE;
- protected long pendingMsgs = 0;
+ protected int pendingMsgs = 0;
+ protected final Receiver[] receivers;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public Sender()
+ public Sender(final Receiver[] receivers)
{
+ this.receivers = receivers;
}
@Override
@@ -90,6 +93,11 @@
protected void onCommit()
{
this.msgs += pendingMsgs;
+ for (Receiver rec : receivers)
+ {
+ rec.messageProduced(pendingMsgs);
+ }
+
pendingMsgs = 0;
System.out.println("commit on sender msgs = " + msgs );
}
Modified:
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
---
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-03
02:07:52 UTC (rev 9635)
+++
branches/Branch_2_1/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-03
03:15:25 UTC (rev 9636)
@@ -104,13 +104,13 @@
session.commit();
- Sender send = new Sender();
+ Receiver rec1 = new Receiver("/queue/diverted1");
+ Receiver rec2 = new Receiver("/queue/diverted2");
+ Sender send = new Sender(new Receiver[]{rec1, rec2});
+
send.start();
-
- Receiver rec1 = new Receiver("/queue/diverted1");
rec1.start();
- Receiver rec2 = new Receiver("/queue/diverted2");
rec2.start();