Author: clebert.suconic(a)jboss.com
Date: 2010-05-11 19:40:00 -0400 (Tue, 11 May 2010)
New Revision: 9228
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-385 - Adding Bill Burke's test (in disabled
form ATM)
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-05-11
19:40:06 UTC (rev 9227)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-05-11
23:40:00 UTC (rev 9228)
@@ -12,12 +12,14 @@
*/
package org.hornetq.tests.integration.client;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
@@ -69,6 +71,126 @@
return encodeSize;
}
+
+ //
https://jira.jboss.org/jira/browse/HORNETQ-385
+ public void disabled_testNoCacheWithReceiveImmediate() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+ try
+ {
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ sf.setConsumerWindowSize(0);
+
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
+
+ int numConsumers = 5;
+
+ ArrayList<ClientConsumer> consumers = new
ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ System.out.println("created: " + i);
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receiveImmediate();
+
+ }
+
+ ClientSession senderSession = sf.createSession(false, false);
+
+ ClientProducer producer = senderSession.createProducer("testWindow");
+
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
+
+ producer.send(sent);
+
+ senderSession.commit();
+
+ senderSession.start();
+
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receive(1000);
+ assertNotNull(received);
+
+ for (ClientSession tmpSess : sessions)
+ {
+ tmpSess.close();
+ }
+
+ senderSession.close();
+
+ }
+ finally
+ {
+ server.stop();
+ }
+
+ }
+
+ //
https://jira.jboss.org/jira/browse/HORNETQ-385
+ public void disabled_testNoCacheWithReceiveImmediate2() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ try
+ {
+ server.start();
+
+ ClientSessionFactory sf = createFactory(false);
+ sf.setConsumerWindowSize(0);
+
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testReceive", "testReceive", true);
+ session.close();
+
+ ClientSession sessionProd = sf.createSession(false, false);
+ ClientMessage msg = sessionProd.createMessage(true);
+ msg.putStringProperty("hello", "world");
+ ClientProducer prod = sessionProd.createProducer("testReceive");
+
+ prod.send(msg);
+ sessionProd.commit();
+
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testReceive");
+ session1.start();
+
+ Thread.sleep(1000);
+ ClientMessage message = null;
+ message = consumer.receiveImmediate();
+ //message = consumer.receive(1000); // the test will pass if used receive(1000)
instead of receiveImmediate
+ assertNotNull(message);
+ System.out.println(message.getStringProperty("hello"));
+ message.acknowledge();
+
+ prod.send(msg);
+ sessionProd.commit();
+
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ System.out.println(message.getStringProperty("hello"));
+ message.acknowledge();
+
+ session.close();
+ session1.close();
+ sessionProd.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+
/*
* tests send window size. we do this by having 2 receivers on the q. since we
roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have
received n messages or at least up