[jboss-cvs] JBoss Messaging SVN: r6680 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 5 21:05:37 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-05 21:05:37 -0400 (Tue, 05 May 2009)
New Revision: 6680
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
Fixing ConsumerWindowSizeTest with no credits on flow control
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-05 23:58:08 UTC (rev 6679)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-06 01:05:37 UTC (rev 6680)
@@ -486,6 +486,12 @@
// Public ---------------------------------------------------------------------------------------
+
+ /** Only use this on tests */
+ public AtomicInteger getAvailableCredits()
+ {
+ return availableCredits;
+ }
// Private --------------------------------------------------------------------------------------
private void promptDelivery()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-05-05 23:58:08 UTC (rev 6679)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-05-06 01:05:37 UTC (rev 6680)
@@ -21,6 +21,7 @@
*/
package org.jboss.messaging.tests.integration.client;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,7 +33,12 @@
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
@@ -62,9 +68,9 @@
int encodeSize = message.getEncodeSize();
session.close();
cf.close();
- return encodeSize;
+ return encodeSize;
}
-
+
/*
* tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -252,37 +258,36 @@
ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
ClientProducer prod = session.createProducer(ADDRESS);
-
- // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from the server
+
+ // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from
+ // the server
// or the client will be buffering messages
assertNull(consNeverUsed.receive(1));
-
+
ClientMessage msg = createTextMessage(session, "This one will expire");
if (largeMessages)
{
msg.getBody().writeBytes(new byte[600]);
}
-
+
msg.setExpiration(System.currentTimeMillis() + 100);
prod.send(msg);
-
+
msg = createTextMessage(session, "First-on-non-buffered");
prod.send(msg);
-
+
Thread.sleep(110);
-
- // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already sent
+
+ // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already
+ // sent
msg = consNeverUsed.receive(TIMEOUT * 1000);
assertNotNull(msg);
assertEquals("First-on-non-buffered", getTextMessage(msg));
msg.acknowledge();
-
ClientConsumer cons1 = session.createConsumer(ADDRESS);
-
-
for (int i = 0; i < numberOfMessages; i++)
{
msg = createTextMessage(session, "Msg" + i);
@@ -599,7 +604,7 @@
if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
// thread around
{
- new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
+ new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace();
failed = true;
}
@@ -861,6 +866,11 @@
testNoWindowRoundRobin(false);
}
+ public void testNoWindowRoundRobinLargeMessage() throws Exception
+ {
+ testNoWindowRoundRobin(true);
+ }
+
private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
{
@@ -878,11 +888,6 @@
ClientSessionFactory sf = createInVMFactory();
sf.setConsumerWindowSize(-1);
- if (largeMessages)
- {
- sf.setMinLargeMessageSize(100);
- }
-
sessionA = sf.createSession(false, true, true);
SimpleString ADDRESS = new SimpleString("some-queue");
@@ -898,6 +903,31 @@
ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+ {
+ // We can only guarantee round robing with WindowSize = -1, after the ServerConsumer object received SessionConsumerFlowCreditMessage(-1)
+ // Since that is done asynchronously we verify that the information was received before we proceed on sending messages or else the distribution won't be
+ // even as expected by the test
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+ assertEquals(1, bindings.getBindings().size());
+
+ for (Binding binding : bindings.getBindings())
+ {
+ Set<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
+
+ for (Consumer consumer : consumers)
+ {
+ ServerConsumerImpl consumerImpl = (ServerConsumerImpl)consumer;
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() && consumerImpl.getAvailableCredits() != null)
+ {
+ new Exception("Trace").printStackTrace();
+ Thread.sleep(10);
+ }
+ }
+ }
+ }
+
ClientProducer prod = sessionA.createProducer(ADDRESS);
for (int i = 0; i < numberOfMessages; i++)
More information about the jboss-cvs-commits
mailing list