[jboss-cvs] JBoss Messaging SVN: r4399 - in trunk/tests/jms-tests/src/org/jboss/test/messaging/jms: message and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 6 06:08:35 EDT 2008
Author: jmesnil
Date: 2008-06-06 06:08:34 -0400 (Fri, 06 Jun 2008)
New Revision: 4399
Modified:
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/selector/SelectorTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
Log:
JBMESSAGING-1050: Replace usage of oswego concurrent with java.util.concurrent
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,7 +21,10 @@
*/
package org.jboss.test.messaging.jms;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -37,10 +40,6 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-import org.jboss.messaging.jms.client.JBossConnectionFactory;
-
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* <p/>
@@ -1184,7 +1183,7 @@
private abstract class LatchListener implements MessageListener
{
- protected Latch latch = new Latch();
+ protected CountDownLatch latch = new CountDownLatch(1);
protected Session sess;
@@ -1199,8 +1198,7 @@
public void waitForMessages() throws InterruptedException
{
- assertTrue("failed to receive all messages", latch.attempt(
- 2000));
+ assertTrue("failed to receive all messages", latch.await(2000, MILLISECONDS));
}
public abstract void onMessage(Message m);
@@ -1234,7 +1232,7 @@
if (!"a".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 2)
@@ -1244,7 +1242,7 @@
if (!"b".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 3)
@@ -1254,7 +1252,7 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
sess.recover();
}
@@ -1265,16 +1263,16 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
- latch.release();
+ latch.countDown();
}
}
catch (Exception e)
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
@@ -1307,7 +1305,7 @@
if (!"a".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 2)
@@ -1317,7 +1315,7 @@
if (!"b".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 3)
@@ -1327,7 +1325,7 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
sess.recover();
}
@@ -1339,16 +1337,16 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
- latch.release();
+ latch.countDown();
}
}
catch (Exception e)
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
@@ -1379,7 +1377,7 @@
{
log.trace("Expected a but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 2)
@@ -1389,7 +1387,7 @@
{
log.trace("Expected b but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 3)
@@ -1399,7 +1397,7 @@
{
log.trace("Expected c but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
sess.recover();
}
@@ -1410,7 +1408,7 @@
{
log.trace("Expected a but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
tm.acknowledge();
assertRemainingMessages(2);
@@ -1423,7 +1421,7 @@
{
log.trace("Expected b but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
sess.recover();
}
@@ -1434,7 +1432,7 @@
{
log.trace("Expected b but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 7)
@@ -1444,11 +1442,11 @@
{
log.trace("Expected c but got " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
tm.acknowledge();
assertRemainingMessages(0);
- latch.release();
+ latch.countDown();
}
}
@@ -1456,7 +1454,7 @@
{
log.error("Caught exception", e);
failed = true;
- latch.release();
+ latch.countDown();
}
}
@@ -1485,7 +1483,7 @@
if (!"a".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 2)
@@ -1494,7 +1492,7 @@
if (!"b".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 3)
@@ -1503,7 +1501,7 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
log.trace("Rollback");
sess.rollback();
@@ -1514,7 +1512,7 @@
if (!"a".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 5)
@@ -1523,7 +1521,7 @@
if (!"b".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
log.trace("commit");
sess.commit();
@@ -1535,7 +1533,7 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
log.trace("recover");
sess.rollback();
@@ -1546,19 +1544,19 @@
if (!"c".equals(tm.getText()))
{
failed = true;
- latch.release();
+ latch.countDown();
}
log.trace("Commit");
sess.commit();
assertRemainingMessages(0);
- latch.release();
+ latch.countDown();
}
}
catch (Exception e)
{
//log.error(e);
failed = true;
- latch.release();
+ latch.countDown();
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,6 +21,10 @@
*/
package org.jboss.test.messaging.jms;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CountDownLatch;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -34,9 +38,7 @@
import org.jboss.messaging.jms.client.JBossConnectionConsumer;
import org.jboss.test.messaging.tools.ServerManagement;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
* ConnectionConsumer tests
*
@@ -352,7 +354,7 @@
class SimpleMessageListener implements MessageListener
{
- Latch latch = new Latch();
+ CountDownLatch latch = new CountDownLatch(1);
boolean failed;
@@ -371,7 +373,7 @@
msgsReceived++;
if (msgsReceived == numExpectedMsgs)
{
- latch.release();
+ latch.countDown();
}
}
@@ -384,7 +386,7 @@
void waitForLatch(long timeout) throws Exception
{
- latch.attempt(timeout);
+ latch.await(timeout, MILLISECONDS);
//Thread.sleep(2000); //Enough time for postDeliver to complete
}
@@ -412,7 +414,7 @@
class RedelMessageListener implements MessageListener
{
- Latch latch = new Latch();
+ CountDownLatch latch = new CountDownLatch(1);
boolean failed;
@@ -427,7 +429,7 @@
void waitForLatch(long timeout) throws Exception
{
- latch.attempt(timeout);
+ latch.await(timeout, MILLISECONDS);
}
public synchronized void onMessage(Message message)
@@ -448,7 +450,7 @@
{
log.trace("Expected a but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 2)
@@ -459,7 +461,7 @@
{
log.trace("Expected b but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 3)
@@ -470,7 +472,7 @@
{
log.trace("Expected c but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
else
{
@@ -489,13 +491,13 @@
{
log.trace("Expected a but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
if (!tm.getJMSRedelivered())
{
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 5)
@@ -506,13 +508,13 @@
{
log.trace("Expected b but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
if (!tm.getJMSRedelivered())
{
log.trace("Redelivered flag not set");
failed = true;
- latch.release();
+ latch.countDown();
}
}
if (count == 6)
@@ -523,13 +525,13 @@
{
log.trace("Expected c but was " + tm.getText());
failed = true;
- latch.release();
+ latch.countDown();
}
if (!tm.getJMSRedelivered())
{
log.trace("Redelivered flag not set");
failed = true;
- latch.release();
+ latch.countDown();
}
else
{
@@ -538,7 +540,7 @@
log.trace("Committing");
sess.commit();
}
- latch.release();
+ latch.countDown();
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,6 +21,10 @@
*/
package org.jboss.test.messaging.jms;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CountDownLatch;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -30,8 +34,6 @@
import javax.jms.Session;
import javax.jms.TextMessage;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
*
* A DeliveryOrderTest
@@ -65,7 +67,7 @@
MessageConsumer cons = sess2.createConsumer(queue1);
- Latch latch = new Latch();
+ CountDownLatch latch = new CountDownLatch(1);
final int NUM_MESSAGES = 1000;
@@ -90,7 +92,7 @@
// need extra commit for cases in which the last message index is not a multiple of 10
sess.commit();
- latch.acquire();
+ latch.await(20000, MILLISECONDS);
if (listener.failed)
{
@@ -111,11 +113,11 @@
{
private int c;
private int num;
- private Latch latch;
+ private CountDownLatch latch;
private volatile boolean failed;
private String error;
- MyListener(Latch latch, int num)
+ MyListener(CountDownLatch latch, int num)
{
this.latch = latch;
this.num = num;
@@ -139,14 +141,14 @@
failed = true;
setError("Listener was supposed to get " + ("message" + c) +
" but got " + tm.getText());
- latch.release();
+ latch.countDown();
}
c++;
if (c == num)
{
- latch.release();
+ latch.countDown();
}
}
catch (JMSException e)
@@ -156,7 +158,7 @@
// Failed
failed = true;
setError("Listener got exception " + e.toString());
- latch.release();
+ latch.countDown();
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,6 +21,12 @@
*/
package org.jboss.test.messaging.jms;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
@@ -29,10 +35,7 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-import EDU.oswego.cs.dl.util.concurrent.Slot;
-
/**
* The most comprehensive, yet simple, unit test.
*
@@ -328,9 +331,10 @@
conn.start();
- final Slot slot = new Slot();
+ final AtomicReference<Message> message = new AtomicReference<Message>();
+ final CountDownLatch latch = new CountDownLatch(1);
- new Thread(new Runnable()
+ new Thread(new Runnable()
{
public void run()
{
@@ -339,7 +343,8 @@
Message m = cons.receive(5000);
if (m != null)
{
- slot.put(m);
+ message.set(m);
+ latch.countDown();
}
}
catch(Exception e)
@@ -357,7 +362,9 @@
prod.send(m);
- TextMessage rm = (TextMessage)slot.poll(5000);
+ boolean gotMessage = latch.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue(gotMessage);
+ TextMessage rm = (TextMessage) message.get();
assertEquals("message one", rm.getText());
}
@@ -382,20 +389,15 @@
MessageConsumer cons = session.createConsumer(queue1);
- final Slot slot = new Slot();
-
+ final AtomicReference<Message> message = new AtomicReference<Message>();
+ final CountDownLatch latch = new CountDownLatch(1);
+
cons.setMessageListener(new MessageListener()
{
public void onMessage(Message m)
{
- try
- {
- slot.put(m);
- }
- catch(InterruptedException e)
- {
- log.warn("got InterruptedException", e);
- }
+ message.set(m);
+ latch.countDown();
}
});
@@ -406,7 +408,9 @@
TextMessage m = session.createTextMessage("one");
prod.send(m);
- TextMessage rm = (TextMessage)slot.poll(5000);
+ boolean gotMessage = latch.await(5000, MILLISECONDS);
+ assertTrue(gotMessage);
+ TextMessage rm = (TextMessage) message.get();
assertEquals("one", rm.getText());
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,11 +21,14 @@
*/
package org.jboss.test.messaging.jms;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -49,8 +52,6 @@
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.JBossTopic;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -2093,7 +2094,7 @@
final MessageConsumer topicConsumer = consumerSession.createConsumer(topic1);
consumerConnection.start();
- final Latch latch = new Latch();
+ final CountDownLatch latch = new CountDownLatch(1);
Thread closerThread = new Thread(new Runnable()
{
public void run()
@@ -2110,7 +2111,7 @@
}
finally
{
- latch.release();
+ latch.countDown();
}
}
}, "closing thread");
@@ -2119,7 +2120,9 @@
assertNull(topicConsumer.receive(1500));
// wait for the closing thread to finish
- latch.acquire();
+ boolean closed = latch.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue(closed);
+
}
finally
{
@@ -3946,7 +3949,7 @@
private class ExceptionRedelMessageListenerImpl implements MessageListener
{
- private Latch latch = new Latch();
+ private CountDownLatch latch = new CountDownLatch(1);
private int count;
@@ -3965,7 +3968,7 @@
public void waitForMessages() throws InterruptedException
{
- latch.acquire();
+ latch.await();
}
public ExceptionRedelMessageListenerImpl(Session sess)
@@ -3985,7 +3988,7 @@
if (!("a".equals(tm.getText())))
{
failed("Should be a but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
throw new RuntimeException("Aardvark");
}
@@ -3997,12 +4000,12 @@
if (!("a".equals(tm.getText())))
{
failed("Should be a but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
if (!tm.getJMSRedelivered())
{
failed("Message was supposed to be a redelivery");
- latch.release();
+ latch.countDown();
}
}
else
@@ -4011,7 +4014,7 @@
if (!("b".equals(tm.getText())))
{
failed("Should be b but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
}
}
@@ -4022,7 +4025,7 @@
if (!("b".equals(tm.getText())))
{
failed("Should be b but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
}
else
@@ -4030,9 +4033,9 @@
if (!("c".equals(tm.getText())))
{
failed("Should be c but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
- latch.release();
+ latch.countDown();
}
}
@@ -4043,15 +4046,15 @@
if (!("c".equals(tm.getText())))
{
failed("Should be c but was " + tm.getText());
- latch.release();
+ latch.countDown();
}
- latch.release();
+ latch.countDown();
}
else
{
//Shouldn't get a 4th messge
failed("Shouldn't get a 4th message");
- latch.release();
+ latch.countDown();
}
}
}
@@ -4059,7 +4062,7 @@
{
log.error(e.getMessage(), e);
failed("Got a JMSException " + e.toString());
- latch.release();
+ latch.countDown();
}
}
}
@@ -4176,17 +4179,17 @@
private class MessageListenerImpl implements MessageListener
{
private List messages = Collections.synchronizedList(new ArrayList());
- private Latch latch = new Latch();
+ private CountDownLatch latch = new CountDownLatch(1);
/** Blocks the calling thread until at least a message is received */
public void waitForMessages() throws InterruptedException
{
- latch.acquire();
+ latch.await();
}
public void waitForMessages(long timeout) throws InterruptedException
{
- boolean acquired = latch.attempt(timeout);
+ boolean acquired = latch.await(timeout, MILLISECONDS);
if (!acquired)
{
log.trace("unsucessful latch aquire attemnpt");
@@ -4199,7 +4202,7 @@
messages.add(m);
log.trace("Added message " + m + " to my list");
- latch.release();
+ latch.countDown();
};
public Message getNextMessage()
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/JMSExpirationHeaderTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -21,13 +21,13 @@
*/
package org.jboss.test.messaging.jms.message;
+import java.util.concurrent.CountDownLatch;
+
import javax.jms.DeliveryMode;
import javax.jms.Message;
import org.jboss.messaging.jms.client.JBossMessage;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -121,7 +121,7 @@
Thread.sleep(2000);
// start the receiver thread
- final Latch latch = new Latch();
+ final CountDownLatch latch = new CountDownLatch(1);
Thread receiverThread = new Thread(new Runnable()
{
public void run()
@@ -136,13 +136,13 @@
}
finally
{
- latch.release();
+ latch.countDown();
}
}
}, "receiver thread");
receiverThread.start();
- latch.acquire();
+ latch.await();
assertNull(expectedMessage);
}
@@ -150,7 +150,7 @@
{
final long timeToWaitForReceive = 5000;
- final Latch receiverLatch = new Latch();
+ final CountDownLatch receiverLatch = new CountDownLatch(1);
// start the receiver thread
Thread receiverThread = new Thread(new Runnable()
@@ -169,13 +169,13 @@
}
finally
{
- receiverLatch.release();
+ receiverLatch.countDown();
}
}
}, "receiver thread");
receiverThread.start();
- final Latch senderLatch = new Latch();
+ final CountDownLatch senderLatch = new CountDownLatch(1);
// start the sender thread
Thread senderThread = new Thread(new Runnable()
@@ -207,15 +207,15 @@
}
finally
{
- senderLatch.release();
+ senderLatch.countDown();
}
}
}, "sender thread");
senderThread.start();
- senderLatch.acquire();
- receiverLatch.acquire();
+ senderLatch.await();
+ receiverLatch.await();
if (testFailed)
{
@@ -257,7 +257,7 @@
//When a consumer is closed while a receive() is in progress it will make the
//receive return with null
- final Latch latch = new Latch();
+ final CountDownLatch latch = new CountDownLatch(1);
// blocking read for a while to make sure I don't get anything, not even a null
Thread receiverThread = new Thread(new Runnable()
{
@@ -283,7 +283,7 @@
}
finally
{
- latch.release();
+ latch.countDown();
}
}
}, "receiver thread");
@@ -295,7 +295,7 @@
queueConsumer.close();
// wait for the reading thread to conclude
- latch.acquire();
+ latch.await();
log.trace("Expected message:" + expectedMessage);
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/selector/SelectorTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/selector/SelectorTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/selector/SelectorTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -35,8 +36,6 @@
import org.jboss.test.messaging.JBMServerTestCase;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
@@ -571,8 +570,8 @@
final List received = new ArrayList();
final List received2 = new ArrayList();
- final Latch latch = new Latch();
- final Latch latch2 = new Latch();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
new Thread(new Runnable()
{
@@ -589,7 +588,7 @@
}
else
{
- latch.release();
+ latch.countDown();
return;
}
}
@@ -616,7 +615,7 @@
}
else
{
- latch2.release();
+ latch2.countDown();
return;
}
}
@@ -628,8 +627,8 @@
}
}, "consumer thread 2").start();
- latch.acquire();
- latch2.acquire();
+ latch.await();
+ latch2.await();
assertEquals(5, received.size());
for(Iterator i = received.iterator(); i.hasNext(); )
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2008-06-06 08:54:26 UTC (rev 4398)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2008-06-06 10:08:34 UTC (rev 4399)
@@ -25,6 +25,9 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -40,9 +43,6 @@
import org.jboss.test.messaging.JBMServerTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
/**
* In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024
*
@@ -81,8 +81,8 @@
// a producer should have a long wait between each message sent?
protected static boolean LONG_WAIT_ON_PRODUCERS=false;
- protected static SynchronizedInt producedMessages = new SynchronizedInt(0);
- protected static SynchronizedInt readMessages = new SynchronizedInt(0);
+ protected static AtomicInteger producedMessages = new AtomicInteger(0);
+ protected static AtomicInteger readMessages = new AtomicInteger(0);
protected Context createContext() throws Exception
@@ -107,7 +107,7 @@
HashSet threads = new HashSet();
// A chhanel of communication between workers and the test method
- LinkedQueue testChannel = new LinkedQueue();
+ LinkedBlockingQueue testChannel = new LinkedBlockingQueue();
for (int i=0; i< NUMBER_OF_PRODUCERS; i++)
@@ -134,7 +134,7 @@
while (threads.size()>0)
{
- SeveralClientsStressTest.InternalMessage msg = (SeveralClientsStressTest.InternalMessage)testChannel.poll(2000);
+ SeveralClientsStressTest.InternalMessage msg = (SeveralClientsStressTest.InternalMessage)testChannel.poll(2000, TimeUnit.MILLISECONDS);
log.info("Produced:" + producedMessages.get() + " and Consumed:" + readMessages.get() + " messages");
@@ -205,7 +205,7 @@
while (consumer.receive(1000)!=null)
{
- readMessages.increment();
+ readMessages.incrementAndGet();
log.info("Received JMS message on clearMessages");
}
@@ -229,8 +229,8 @@
}
clearMessages();
- producedMessages = new SynchronizedInt(0);
- readMessages = new SynchronizedInt(0);
+ producedMessages = new AtomicInteger(0);
+ readMessages = new AtomicInteger(0);
}
// Private --------------------------------------------------------------------------------------
@@ -247,7 +247,7 @@
private int workerId;
private Exception ex;
- LinkedQueue messageQueue;
+ LinkedBlockingQueue messageQueue;
public int getWorkerId()
@@ -292,7 +292,7 @@
}
- public Worker(String name, int workerId, LinkedQueue messageQueue)
+ public Worker(String name, int workerId, LinkedBlockingQueue messageQueue)
{
super(name);
this.workerId = workerId;
@@ -308,7 +308,7 @@
class Producer extends SeveralClientsStressTest.Worker
{
- public Producer(int producerId, LinkedQueue messageQueue)
+ public Producer(int producerId, LinkedBlockingQueue messageQueue)
{
super("Producer:" + producerId, producerId, messageQueue);
}
@@ -349,7 +349,7 @@
while(System.currentTimeMillis() < timeToFinish)
{
prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis()));
- producedMessages.increment();
+ producedMessages.incrementAndGet();
messageSent++;
if (messageSent%50==0)
{
@@ -385,7 +385,7 @@
class Consumer extends SeveralClientsStressTest.Worker
{
- public Consumer(int consumerId, LinkedQueue messageQueue)
+ public Consumer(int consumerId, LinkedBlockingQueue messageQueue)
{
super("ClientConsumer:" + consumerId, consumerId, messageQueue);
}
@@ -429,7 +429,7 @@
{
if (info) log.info("Commit transaction");
sess.commit();
- readMessages.add(msgs);
+ readMessages.addAndGet(msgs);
}
else
{
@@ -445,7 +445,7 @@
}
}
- readMessages.add(msgs);
+ readMessages.addAndGet(msgs);
sess.commit();
sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
More information about the jboss-cvs-commits
mailing list