[jboss-cvs] JBoss Messaging SVN: r6205 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 27 18:39:37 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-27 18:39:36 -0400 (Fri, 27 Mar 2009)
New Revision: 6205
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
Log:
Adding SlowConsumerTests (both windowSize=1 and windowSize=0) and fixing slowConsumers for largeMessages and regularMessages
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-27 22:39:36 UTC (rev 6205)
@@ -135,6 +135,11 @@
"Cannot call receive(...) - a MessageHandler is set");
}
+ if (clientWindowSize == 0 && buffer.isEmpty())
+ {
+ sendCredits(1);
+ }
+
receiverThread = Thread.currentThread();
if (timeout == 0)
@@ -152,7 +157,7 @@
while (true)
{
ClientMessageInternal m = null;
-
+
synchronized (this)
{
while ((stopped || (m = buffer.poll()) == null) &&
@@ -248,6 +253,11 @@
}
boolean noPreviousHandler = handler == null;
+
+ if (handler != theHandler && clientWindowSize == 0)
+ {
+ sendCredits(1);
+ }
handler = theHandler;
@@ -492,16 +502,16 @@
sessionExecutor.execute(runner);
}
- private void flowControl(final int messageBytes, final boolean useExecutor) throws MessagingException
+ private void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
{
- if (clientWindowSize > 0)
+ if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
if (creditsToSend >= clientWindowSize)
{
- if (useExecutor)
+ if (isLargeMessage)
{
// Flowcontrol on largeMessages continuations needs to be done in a separate thread or failover would block
final int credits = creditsToSend;
@@ -509,23 +519,39 @@
creditsToSend = 0;
sessionExecutor.execute(new Runnable()
{
-
public void run()
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ sendCredits(credits);
}
+
});
}
else
{
- channel.send(new SessionConsumerFlowCreditMessage(id, creditsToSend));
+ if (clientWindowSize == 0)
+ {
+ // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be aways buffering one after received the first message
+ sendCredits(creditsToSend - 1);
+ }
+ else
+ {
+ sendCredits(creditsToSend);
+ }
creditsToSend = 0;
}
}
}
}
+ /**
+ * @param credits
+ */
+ private void sendCredits(final int credits)
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ }
+
private void waitForOnMessageToComplete()
{
if (handler == null)
@@ -598,6 +624,11 @@
{
session.expire(id, message.getMessageID());
}
+
+ if (clientWindowSize == 0)
+ {
+ sendCredits(1);
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-27 22:39:36 UTC (rev 6205)
@@ -1218,9 +1218,14 @@
// caution for very fast consumers
clientWindowSize = -1;
}
+ else if (windowSize == 0)
+ {
+ // Slow consumer - no buffering
+ clientWindowSize = 0;
+ }
else if (windowSize == 1)
{
- // Slow consumer - no buffering
+ // Slow consumer = buffer 1
clientWindowSize = 1;
}
else if (windowSize > 1)
@@ -1249,7 +1254,10 @@
// We even send it if windowSize == -1, since we need to start the
// consumer
- channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
+ if (windowSize != 0)
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
+ }
return consumer;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-27 22:39:36 UTC (rev 6205)
@@ -801,7 +801,7 @@
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
- if (precalculateAvailableCredits <= 0)
+ if (precalculateAvailableCredits <= 0 && availableCredits != null)
{
if (trace)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java 2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java 2009-03-27 22:39:36 UTC (rev 6205)
@@ -21,29 +21,32 @@
*/
package org.jboss.messaging.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*/
public class ClientConsumerWindowSizeTest extends ServiceTestBase
{
- public final SimpleString addressA = new SimpleString("addressA");
+ private final SimpleString addressA = new SimpleString("addressA");
- public final SimpleString queueA = new SimpleString("queueA");
+ private final SimpleString queueA = new SimpleString("queueA");
- public final SimpleString queueB = new SimpleString("queueB");
+ private final int TIMEOUT = 5;
- public final SimpleString queueC = new SimpleString("queueC");
-
/*
* tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -56,7 +59,7 @@
try
{
messagingService.start();
- cf.setBlockOnNonPersistentSend(true);
+ cf.setBlockOnNonPersistentSend(false);
ClientSession sendSession = cf.createSession(false, true, true);
ClientSession receiveSession = cf.createSession(false, true, true);
sendSession.createQueue(addressA, queueA, false);
@@ -83,11 +86,21 @@
m.acknowledge();
}
receiveSession.close();
- Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
- assertEquals(numMessage, q.getDeliveringCount());
+
+ for (int i = 0; i < numMessage * 2; i++)
+ {
+ ClientMessage m = cc.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+
session.close();
sendSession.close();
+
+
+ assertEquals(0, getMessageCount(messagingService, queueA.toString()));
+
}
finally
{
@@ -98,11 +111,11 @@
}
}
- public void testSlowConsumer() throws Exception
+ public void testSlowConsumerBufferingOne() throws Exception
{
MessagingService service = createService(false);
- ClientSession sessionNotUsed = null;
+ ClientSession sessionB = null;
ClientSession session = null;
try
@@ -116,16 +129,16 @@
session = sf.createSession(false, true, true);
- SimpleString ADDRESS = new SimpleString("some-queue");
+ SimpleString ADDRESS = addressA;
session.createQueue(ADDRESS, ADDRESS, true);
- sessionNotUsed = sf.createSession(false, true, true);
- sessionNotUsed.start();
+ sessionB = sf.createSession(false, true, true);
+ sessionB.start();
session.start();
- ClientConsumer consNeverUsed = sessionNotUsed.createConsumer(ADDRESS);
+ ClientConsumer consNeverUsed = sessionB.createConsumer(ADDRESS);
ClientConsumer cons1 = session.createConsumer(ADDRESS);
@@ -146,13 +159,13 @@
ClientMessage msg = consNeverUsed.receive(500);
assertNotNull(msg);
msg.acknowledge();
-
+
session.close();
session = null;
-
- sessionNotUsed.close();
- sessionNotUsed = null;
+ sessionB.close();
+ sessionB = null;
+
assertEquals(0, getMessageCount(service, ADDRESS.toString()));
}
@@ -162,8 +175,8 @@
{
if (session != null)
session.close();
- if (sessionNotUsed != null)
- sessionNotUsed.close();
+ if (sessionB != null)
+ sessionB.close();
}
catch (Exception ignored)
{
@@ -176,136 +189,668 @@
}
}
- // A better slow consumer test
+ public void testSlowConsumerNoBuffer() throws Exception
+ {
+ internalTestSlowConsumerNoBuffer(false);
+ }
+
+ public void testSlowConsumerNoBufferLargeMessages() throws Exception
+ {
+ internalTestSlowConsumerNoBuffer(true);
+ }
+
+ private void internalTestSlowConsumerNoBuffer(boolean largeMessages) throws Exception
+ {
+ MessagingService service = createService(false);
+
+ ClientSession sessionB = null;
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerWindowSize(0);
+
+ if (largeMessages)
+ {
+ sf.setMinLargeMessageSize(100);
+ }
+
+ session = sf.createSession(false, true, true);
+
+ SimpleString ADDRESS = addressA;
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionB = sf.createSession(false, true, true);
+ sessionB.start();
+
+ session.start();
+
+ ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+ ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = createTextMessage(session, "Msg" + i);
+
+ if (largeMessages)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+
+ prod.send(msg);
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ assertNotNull("expected message at i = " + i, msg);
+ assertEquals("Msg" + i, getTextMessage(msg));
+ msg.acknowledge();
+ }
+
+ assertEquals(0, consNeverUsed.getBufferSize());
+
+ session.close();
+ session = null;
+
+ sessionB.close();
+ sessionB = null;
+
+ assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ session.close();
+ if (sessionB != null)
+ sessionB.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSlowConsumerNoBuffer2() throws Exception
+ {
+ internalTestSlowConsumerNoBuffer2(false);
+ }
+
+ public void testSlowConsumerNoBuffer2LargeMessages() throws Exception
+ {
+ internalTestSlowConsumerNoBuffer2(true);
+ }
+
+ private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
+ {
+ MessagingService service = createService(false);
+
+ ClientSession session1 = null;
+ ClientSession session2 = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setConsumerWindowSize(0);
+
+ if (largeMessages)
+ {
+ sf.setMinLargeMessageSize(100);
+ }
+
+ session1 = sf.createSession(false, true, true);
+
+ session2 = sf.createSession(false, true, true);
+
+ session1.start();
+
+ session2.start();
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session1.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+ // Note we make sure we send the messages *before* cons2 is created
+
+ ClientProducer prod = session1.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = createTextMessage(session1, "Msg" + i);
+ if (largeMessages)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+ prod.send(msg);
+ }
+
+ ClientConsumerInternal cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ assertNotNull("expected message at i = " + i, msg);
+
+ String str = getTextMessage(msg);
+ assertEquals("Msg" + i, str);
+
+ msg.acknowledge();
+
+ assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+ }
+
+ for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons2.receive(1000);
+
+ assertNotNull("expected message at i = " + i, msg);
+
+ assertEquals("Msg" + i, msg.getBody().readString());
+
+ msg.acknowledge();
+
+ assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+ }
+
+ session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+ // the getMessageCount would fail
+ session2.commit();
+
+ assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+ // This should also work the other way around
+
+ cons1.close();
+
+ cons2.close();
+
+ cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+ // Note we make sure we send the messages *before* cons2 is created
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = createTextMessage(session1, "Msg" + i);
+ if (largeMessages)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+ prod.send(msg);
+ }
+
+ cons2 = (ClientConsumerInternal)session2.createConsumer(ADDRESS);
+
+ // Now we receive on cons2 first
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage msg = cons2.receive(1000);
+ assertNotNull("expected message at i = " + i, msg);
+
+ assertEquals("Msg" + i, msg.getBody().readString());
+
+ msg.acknowledge();
+
+ assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons2.getBufferSize());
+
+ }
+
+ for (int i = numberOfMessages / 2; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+
+ assertNotNull("expected message at i = " + i, msg);
+
+ assertEquals("Msg" + i, msg.getBody().readString());
+
+ msg.acknowledge();
+
+ assertEquals("A slow consumer shouldn't buffer anything on the client side!", 0, cons1.getBufferSize());
+ }
+
+ session1.commit(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+ // the getMessageCount would fail
+ session2.commit();
+
+ assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+ }
+ finally
+ {
+ try
+ {
+ if (session1 != null)
+ session1.close();
+ if (session2 != null)
+ session2.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
+ {
+ MessagingService service = createService(false);
+
+ ClientSession sessionB = null;
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerWindowSize(0);
+
+ session = sf.createSession(false, true, true);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionB = sf.createSession(false, true, true);
+ sessionB.start();
+
+ session.start();
+
+ ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+ final CountDownLatch latchReceived = new CountDownLatch(2);
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
+
+ // It should receive two messages and then give up
+ class LocalHandler implements MessageHandler
+ {
+ boolean failed = false;
+
+ int count = 0;
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+ */
+ public synchronized void onMessage(ClientMessage message)
+ {
+ try
+ {
+ String str = getTextMessage(message);
+
+ failed = failed || !str.equals("Msg" + count);
+
+ message.acknowledge();
+ latchReceived.countDown();
+
+ if (count++ == 1)
+ {
+ // it will hold here for a while
+ if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS)) // a timed wait, so if the test fails, one less
+ // thread around
+ {
+ new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+ // or
+ // junit
+ // report
+ failed = true;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // Hudson / JUnit report
+ failed = true;
+ }
+ }
+ }
+
+ LocalHandler handler = new LocalHandler();
+
+ ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ prod.send(createTextMessage(session, "Msg" + i));
+ }
+
+ consReceiveOneAndHold.setMessageHandler(handler);
+
+ assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+ assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+ for (int i = 2; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ assertNotNull("expected message at i = " + i, msg);
+ assertEquals("Msg" + i, getTextMessage(msg));
+ msg.acknowledge();
+ }
+
+ assertEquals(0, consReceiveOneAndHold.getBufferSize());
+
+ latchDone.countDown();
+
+ session.close();
+ session = null;
+
+ sessionB.close();
+ sessionB = null;
+
+ assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+ assertFalse("MessageHandler received a failure", handler.failed);
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ session.close();
+ if (sessionB != null)
+ sessionB.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testSlowConsumerOnMessageHandlerBufferOne() throws Exception
+ {
+ internalTestSlowConsumerOnMessageHandlerBufferOne(false);
+ }
+
+ public void testSlowConsumerOnMessageHandlerBufferOneLargeMessages() throws Exception
+ {
+ internalTestSlowConsumerOnMessageHandlerBufferOne(true);
+ }
- //Commented out until behaviour is fixed
-// public void testSlowConsumer2() throws Exception
-// {
-// MessagingService service = createService(false);
-//
-// ClientSession session1 = null;
-// ClientSession session2 = null;
-//
-// try
-// {
-// final int numberOfMessages = 100;
-//
-// service.start();
-//
-// ClientSessionFactory sf = createInVMFactory();
-//
-// sf.setConsumerWindowSize(1);
-//
-// session1 = sf.createSession(false, true, true);
-//
-// session2 = sf.createSession(false, true, true);
-//
-// session1.start();
-//
-// session2.start();
-//
-// SimpleString ADDRESS = new SimpleString("some-queue");
-//
-// session1.createQueue(ADDRESS, ADDRESS, true);
-//
-// ClientConsumer cons1 = session1.createConsumer(ADDRESS);
-//
-// //Note we make sure we send the messages *before* cons2 is created
-//
-// ClientProducer prod = session1.createProducer(ADDRESS);
-//
-// for (int i = 0; i < numberOfMessages; i++)
-// {
-// prod.send(createTextMessage(session1, "Msg" + i));
-// }
-//
-// ClientConsumer cons2 = session2.createConsumer(ADDRESS);
-//
-// for (int i = 0; i < numberOfMessages; i += 2)
-// {
-// ClientMessage msg = cons1.receive(1000);
-// assertNotNull("expected message at i = " + i, msg);
-//
-// //assertEquals("Msg" + i, msg.getBody().readString());
-//
-// msg.acknowledge();
-// }
-//
-// for (int i = 1; i < numberOfMessages; i += 2)
-// {
-// ClientMessage msg = cons2.receive(1000);
-//
-// assertNotNull("expected message at i = " + i, msg);
-//
-// assertEquals("Msg" + i, msg.getBody().readString());
-//
-// msg.acknowledge();
-// }
-//
-// assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-//
-// //This should also work the other way around
-//
-// cons1.close();
-//
-// cons2.close();
-//
-// cons1 = session1.createConsumer(ADDRESS);
-//
-// //Note we make sure we send the messages *before* cons2 is created
-//
-// for (int i = 0; i < numberOfMessages; i++)
-// {
-// prod.send(createTextMessage(session1, "Msg" + i));
-// }
-//
-// cons2 = session2.createConsumer(ADDRESS);
-//
-// //Now we receive on cons2 first
-//
-// for (int i = 0; i < numberOfMessages; i += 2)
-// {
-// ClientMessage msg = cons2.receive(1000);
-// assertNotNull("expected message at i = " + i, msg);
-//
-// assertEquals("Msg" + i, msg.getBody().readString());
-//
-// msg.acknowledge();
-// }
-//
-// for (int i = 1; i < numberOfMessages; i += 2)
-// {
-// ClientMessage msg = cons1.receive(1000);
-//
-// assertNotNull("expected message at i = " + i, msg);
-//
-// assertEquals("Msg" + i, msg.getBody().readString());
-//
-// msg.acknowledge();
-// }
-//
-// assertEquals(0, getMessageCount(service, ADDRESS.toString()));
-//
-//
-// }
-// finally
-// {
-// try
-// {
-// if (session1 != null)
-// session1.close();
-// if (session2 != null)
-// session2.close();
-// }
-// catch (Exception ignored)
-// {
-// }
-//
-// if (service.isStarted())
-// {
-// service.stop();
-// }
-// }
-// }
+
+ private void internalTestSlowConsumerOnMessageHandlerBufferOne(boolean largeMessage) throws Exception
+ {
+ MessagingService service = createService(false);
+ ClientSession sessionB = null;
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerWindowSize(1);
+
+ if (largeMessage)
+ {
+ sf.setMinLargeMessageSize(100);
+ }
+
+ session = sf.createSession(false, true, true);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionB = sf.createSession(false, true, true);
+ sessionB.start();
+
+ session.start();
+
+ ClientConsumerInternal consReceiveOneAndHold = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+ final CountDownLatch latchReceived = new CountDownLatch(2);
+ final CountDownLatch latchReceivedBuffered = new CountDownLatch(3);
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
+
+ // It should receive two messages and then give up
+ class LocalHandler implements MessageHandler
+ {
+ boolean failed = false;
+
+ int count = 0;
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+ */
+ public synchronized void onMessage(ClientMessage message)
+ {
+ try
+ {
+ String str = getTextMessage(message);
+
+ System.out.println("Received " + str);
+
+ failed = failed || !str.equals("Msg" + count);
+
+ message.acknowledge();
+ latchReceived.countDown();
+ latchReceivedBuffered.countDown();
+
+ if (count++ == 1)
+ {
+ // it will hold here for a while
+ if (!latchDone.await(TIMEOUT, TimeUnit.SECONDS))
+ {
+ new Exception("ClientConsuemrWindowSizeTest Handler couldn't receive signal in less than 5 seconds").printStackTrace(); // hudson
+ // or
+ // junit
+ // report
+ failed = true;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // Hudson / JUnit report
+ failed = true;
+ }
+ }
+ }
+
+ LocalHandler handler = new LocalHandler();
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = createTextMessage(session, "Msg" + i);
+ if (largeMessage)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+ prod.send(msg);
+ }
+
+ consReceiveOneAndHold.setMessageHandler(handler);
+
+ assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
+
+ ClientConsumer cons1 = session.createConsumer(ADDRESS);
+
+ for (int i = 3; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ assertNotNull("expected message at i = " + i, msg);
+ assertEquals("Msg" + i, getTextMessage(msg));
+ msg.acknowledge();
+ }
+
+ latchDone.countDown();
+
+ assertTrue(latchReceivedBuffered.await(TIMEOUT, TimeUnit.SECONDS));
+
+ session.close();
+ session = null;
+
+ sessionB.close();
+ sessionB = null;
+
+ assertEquals(0, getMessageCount(service, ADDRESS.toString()));
+
+ assertFalse("MessageHandler received a failure", handler.failed);
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ session.close();
+ if (sessionB != null)
+ sessionB.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
+ public void testNoWindowRoundRobin() throws Exception
+ {
+ testNoWindowRoundRobin(false);
+ }
+
+
+ public void testNoWindowRoundRobinLargeMessage() throws Exception
+ {
+ testNoWindowRoundRobin(true);
+ }
+
+ private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
+ {
+
+ MessagingService service = createService(false);
+
+ ClientSession sessionA = null;
+ ClientSession sessionB = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ service.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setConsumerWindowSize(-1);
+
+ if (largeMessages)
+ {
+ sf.setMinLargeMessageSize(100);
+ }
+
+ sessionA = sf.createSession(false, true, true);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ sessionA.createQueue(ADDRESS, ADDRESS, true);
+
+ sessionB = sf.createSession(false, true, true);
+
+ sessionA.start();
+ sessionB.start();
+
+ ClientConsumerInternal consA = (ClientConsumerInternal)sessionA.createConsumer(ADDRESS);
+
+ ClientConsumerInternal consB = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
+
+ ClientProducer prod = sessionA.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = createTextMessage(sessionA, "Msg" + i);
+ if (largeMessages)
+ {
+ msg.getBody().writeBytes(new byte[600]);
+ }
+ prod.send(msg);
+ }
+
+
+ long timeout = System.currentTimeMillis() + TIMEOUT * 1000;
+
+ boolean foundA = false;
+ boolean foundB = false;
+
+ do
+ {
+ foundA = consA.getBufferSize() == numberOfMessages / 2;
+ foundB = consB.getBufferSize() == numberOfMessages / 2;
+
+ Thread.sleep(10);
+ } while ((!foundA || !foundB) && System.currentTimeMillis() < timeout);
+
+
+ assertTrue("ConsumerA didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundA);
+ assertTrue("ConsumerB didn't receive the expected number of messages on buffer (consA=" + consA.getBufferSize() + ", consB=" + consB.getBufferSize() + ") foundA = " + foundA + " foundB = " + foundB, foundB);
+
+
+ }
+ finally
+ {
+ try
+ {
+ if (sessionA != null)
+ sessionA.close();
+ if (sessionB != null)
+ sessionB.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (service.isStarted())
+ {
+ service.stop();
+ }
+ }
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java 2009-03-27 18:57:23 UTC (rev 6204)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionStopStartTest.java 2009-03-27 22:39:36 UTC (rev 6205)
@@ -314,7 +314,15 @@
latch.await();
- session.stop();
+ try
+ {
+ session.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw e;
+ }
assertFalse(handler.failed);
More information about the jboss-cvs-commits
mailing list