[jboss-cvs] JBoss Messaging SVN: r6613 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Apr 28 23:55:48 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-28 23:55:48 -0400 (Tue, 28 Apr 2009)
New Revision: 6613
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
Fixing ConsumerWindowSizeTest (LargeMessage & no-buffering (slow consumer))
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-29 01:16:13 UTC (rev 6612)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-29 03:55:48 UTC (rev 6613)
@@ -483,7 +483,12 @@
}
}
- public void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
+ /**
+ * flow control is synchornized because of LargeMessage and streaming.
+ * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
+ * So, this operation needs to be atomic.
+ * */
+ public synchronized void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
{
if (clientWindowSize >= 0)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 01:16:13 UTC (rev 6612)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-04-29 03:55:48 UTC (rev 6613)
@@ -47,13 +47,11 @@
private final SimpleString queueA = new SimpleString("queueA");
private final int TIMEOUT = 5;
-
+
private static final Logger log = Logger.getLogger(ConsumerWindowSizeTest.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
-
/*
* tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -306,7 +304,6 @@
internalTestSlowConsumerNoBuffer2(false);
}
-
public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
{
internalTestSlowConsumerNoBuffer2(true);
@@ -391,17 +388,16 @@
}
session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
- // the getMessageCount would fail
+ // the getMessageCount would fail
session2.close();
session1 = sf.createSession(false, true, true);
session1.start();
session2 = sf.createSession(false, true, true);
session2.start();
-
+
prod = session1.createProducer(ADDRESS);
-
assertEquals(0, getMessageCount(server, ADDRESS.toString()));
// This should also work the other way around
@@ -489,7 +485,7 @@
{
internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
}
-
+
public void testSlowConsumerOnMessageHandlerNoBuffersLargeMessage() throws Exception
{
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
@@ -497,7 +493,7 @@
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(boolean largeMessages) throws Exception
{
-
+
MessagingServer server = createServer(false);
ClientSession sessionB = null;
@@ -517,7 +513,6 @@
sf.setMinLargeMessageSize(100);
}
-
session = sf.createSession(false, true, true);
SimpleString ADDRESS = new SimpleString("some-queue");
@@ -752,14 +747,13 @@
consReceiveOneAndHold.setMessageHandler(handler);
assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
-
-
+
long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
{
Thread.sleep(10);
}
-
+
assertEquals(1, consReceiveOneAndHold.getBufferSize());
ClientConsumer cons1 = session.createConsumer(ADDRESS);
More information about the jboss-cvs-commits
mailing list