[jboss-cvs] JBoss Messaging SVN: r4642 - in trunk: tests/src/org/jboss/messaging/tests/timing/core/client/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 7 08:19:16 EDT 2008
Author: timfox
Date: 2008-07-07 08:19:15 -0400 (Mon, 07 Jul 2008)
New Revision: 4642
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
More tests
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-07-07 12:19:15 UTC (rev 4642)
@@ -57,7 +57,7 @@
private static final boolean trace = log.isTraceEnabled();
- private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
+ public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
// Attributes
// -----------------------------------------------------------------------------------
@@ -414,7 +414,7 @@
if (clientWindowSize > 0)
{
creditsToSend += messageBytes;
-
+
if (creditsToSend >= clientWindowSize)
{
remotingConnection.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(creditsToSend));
@@ -510,7 +510,7 @@
}
}
- public void doCleanUp(boolean sendCloseMessage) throws MessagingException
+ private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
{
if (closed)
{
@@ -537,7 +537,7 @@
receiverThread = null;
- if(sendCloseMessage)
+ if (sendCloseMessage)
{
remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
}
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java 2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/client/impl/ClientConsumerImplTest.java 2008-07-07 12:19:15 UTC (rev 4642)
@@ -21,11 +21,15 @@
*/
package org.jboss.messaging.tests.timing.core.client.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.client.impl.ClientConsumerImpl;
import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
import org.jboss.messaging.core.client.impl.ClientSessionInternal;
@@ -33,6 +37,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.tests.util.UnitTestCase;
/**
@@ -69,7 +74,7 @@
{
try
{
- consumer.receive(1000);
+ consumer.receive(2000);
}
catch (Exception e)
{
@@ -79,7 +84,7 @@
t.start();
- Thread.sleep(100);
+ Thread.sleep(1000);
try
{
@@ -96,7 +101,485 @@
t.interrupt();
}
}
+
+ public void testCloseWhileReceiving() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final long clientTargetID = 283748;
+ final long targetID = 12934;
+ final long sessionTargetID = 23847327;
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+
+ pd.unregister(clientTargetID);
+ session.removeConsumer(consumer);
+ EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+ EasyMock.replay(session, rc, executor, pd);
+
+ class ReceiverThread extends Thread
+ {
+ volatile boolean returned;
+ volatile ClientMessage msg;
+ volatile boolean failed;
+ public void run()
+ {
+ try
+ {
+ msg = consumer.receive();
+ returned = true;
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ }
+ }
+ };
+
+ ReceiverThread t = new ReceiverThread();
+
+ t.start();
+
+ Thread.sleep(2000);
+
+ consumer.close();
+
+ Thread.sleep(2000);
+
+ assertTrue(t.returned);
+ assertNull(t.msg);
+ assertFalse(t.failed);
+
+ t.join();
+
+ EasyMock.verify(session, rc, executor, pd);
+ }
+
+ public void testReceiveHandleMessagesAfterReceiveNoTimeout() throws Exception
+ {
+ testReceiveHandleMessagesAfterReceive(4000);
+ }
+
+ public void testReceiveHandleMessagesAfterReceiveTimeout() throws Exception
+ {
+ testReceiveHandleMessagesAfterReceive(0);
+ }
+
+ private void testReceiveHandleMessagesAfterReceive(final int timeout) throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int numMessages = 10;
+
+ final List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(false);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ session.delivered((long)i, false);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+ }
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ final long pause = 2000;
+
+ class AdderThread extends Thread
+ {
+ volatile boolean failed;
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(pause);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to add messages", e);
+ failed = true;
+ }
+ }
+ };
+
+ AdderThread t = new AdderThread();
+
+ t.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg;
+
+ if (timeout == 0)
+ {
+ msg = consumer.receive();
+ }
+ else
+ {
+ msg = consumer.receive(timeout);
+ }
+
+ assertTrue(msg == msgs.get(i));
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ t.join();
+
+ assertFalse(t.failed);
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+
+ assertEquals(0, consumer.getBufferSize());
+ }
+
+ public void testReceiveHandleMessagesAfterReceiveWithTimeout() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int numMessages = 10;
+
+ final List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+ }
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ final long pause = 2000;
+
+ final long timeout = 1000;
+
+ class AdderThread extends Thread
+ {
+ volatile boolean failed;
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(pause);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to add messages", e);
+ failed = true;
+ }
+ }
+ };
+
+ AdderThread t = new AdderThread();
+
+ t.start();
+
+ ClientMessage msg = consumer.receive(timeout);
+
+ assertNull(msg);
+
+ t.join();
+
+ assertFalse(t.failed);
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+
+ assertEquals(numMessages, consumer.getBufferSize());
+ }
+
+ public void testReceiveExpiredWithTimeout() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int numMessages = 10;
+
+ List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(true);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ session.delivered((long)i, true);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+ }
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages, consumer.getBufferSize());
+
+ for (ClientMessage msg: msgs)
+ {
+ assertNull(consumer.receive(100));
+ }
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+ }
+
+ public void testWaitForOnMessageToCompleteOnClose() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final long clientTargetID = 283748;
+ final long targetID = 12934;
+ final long sessionTargetID = 23847327;
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+
+ class MyHandler implements MessageHandler
+ {
+ volatile boolean failed;
+ volatile boolean complete;
+ public void onMessage(ClientMessage msg)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ complete = true;
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ }
+ }
+ };
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(false);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+
+ session.delivered(0L, false);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+
+ pd.unregister(clientTargetID);
+ session.removeConsumer(consumer);
+ EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+ EasyMock.replay(session, rc, pd, msg);
+
+ consumer.handleMessage(msg);
+
+ consumer.close();
+
+ assertTrue(handler.complete);
+
+ assertFalse(handler.failed);
+
+ EasyMock.verify(session, rc, pd, msg);
+ }
+
+ public void testWaitForOnMessageToCompleteOnCloseTimeout() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final long clientTargetID = 283748;
+ final long targetID = 12934;
+ final long sessionTargetID = 23847327;
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+
+ class MyHandler implements MessageHandler
+ {
+ volatile boolean failed;
+ volatile boolean complete;
+ public void onMessage(ClientMessage msg)
+ {
+ try
+ {
+ Thread.sleep(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS + 2000);
+ complete = true;
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ }
+ }
+ };
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(false);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+
+ session.delivered(0L, false);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+
+ pd.unregister(clientTargetID);
+ session.removeConsumer(consumer);
+ EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+ EasyMock.replay(session, rc, pd, msg);
+
+ consumer.handleMessage(msg);
+
+ long start = System.currentTimeMillis();
+ consumer.close();
+ long end = System.currentTimeMillis();
+ assertTrue((end - start) >= ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
+
+ assertFalse(handler.complete);
+
+ assertFalse(handler.failed);
+
+ EasyMock.verify(session, rc, pd, msg);
+ }
+
+ public void testWaitForOnMessageToCompleteOnCloseSameThread() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = new DirectExecutorService();
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final long clientTargetID = 283748;
+ final long targetID = 12934;
+ final long sessionTargetID = 23847327;
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, clientTargetID, 787, false, rc, pd, executor, sessionTargetID);
+
+ class MyHandler implements MessageHandler
+ {
+ volatile boolean failed;
+ volatile boolean complete;
+ public void onMessage(ClientMessage msg)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ complete = true;
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ }
+ }
+ };
+
+ MyHandler handler = new MyHandler();
+
+ consumer.setMessageHandler(handler);
+
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(false);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn(0L);
+
+ session.delivered(0L, false);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+
+ pd.unregister(clientTargetID);
+ session.removeConsumer(consumer);
+ EasyMock.expect(rc.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+ EasyMock.replay(session, rc, pd, msg);
+
+ consumer.handleMessage(msg);
+
+ consumer.close();
+
+ assertTrue(handler.complete);
+
+ assertFalse(handler.failed);
+
+ EasyMock.verify(session, rc, pd, msg);
+ }
+
// Private -----------------------------------------------------------------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java 2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java 2008-07-07 12:19:15 UTC (rev 4642)
@@ -24,9 +24,7 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import org.easymock.EasyMock;
import org.jboss.messaging.core.client.ClientMessage;
@@ -39,6 +37,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.tests.util.UnitTestCase;
@@ -46,12 +45,6 @@
*
* A ClientConsumerImplTest
*
- * TODO - still need to test:
- * priority
- * flow control
- * closing
- * waiting for message listener to complete etc
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -104,40 +97,7 @@
assertEquals(numMessages, consumer.getBufferSize());
}
-
- private class DirectExecutorService extends AbstractExecutorService
- {
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException
- {
- return false;
- }
-
- public boolean isShutdown()
- {
- return false;
- }
-
- public void shutdown()
- {
- }
-
- public boolean isTerminated()
- {
- return false;
- }
-
- public List<Runnable> shutdownNow()
- {
- return null;
- }
-
- public void execute(Runnable command)
- {
- command.run();
- }
- }
-
+
public void testHandleMessageWithNonDirectHandler() throws Exception
{
ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
@@ -459,7 +419,7 @@
EasyMock.verify(msgs.toArray());
}
- public void testReceiveExpired() throws Exception
+ public void testReceiveExpiredImmediate() throws Exception
{
ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
@@ -510,6 +470,8 @@
EasyMock.verify(msgs.toArray());
}
+
+
public void testReceiveWithHandler() throws Exception
{
ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
@@ -712,6 +674,186 @@
assertEquals(0, consumer.getBufferSize());
}
+ public void testFlowControlExact() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int clientWindowSize = 500;
+
+ final int numMessages = 10;
+
+ final int msgSize = 100;
+
+ List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(true);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ session.delivered((long)i, true);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+ }
+
+ final long targetID = 120912;
+ final long sessionTargetID = 12348;
+
+ rc.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(clientWindowSize));
+ EasyMock.expectLastCall().times(2);
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages, consumer.getBufferSize());
+
+ for (ClientMessage msg: msgs)
+ {
+ assertNull(consumer.receiveImmediate());
+ }
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+ }
+
+ public void testFlowControlInExact() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int clientWindowSize = 500;
+
+ final int numMessages = 10;
+
+ final int msgSize = 101;
+
+ List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(true);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ session.delivered((long)i, true);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+ }
+
+ final long targetID = 120912;
+ final long sessionTargetID = 12348;
+
+ rc.sendOneWay(targetID, sessionTargetID, new ConsumerFlowCreditMessage(505));
+ EasyMock.expectLastCall().times(2);
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages, consumer.getBufferSize());
+
+ for (ClientMessage msg: msgs)
+ {
+ assertNull(consumer.receiveImmediate());
+ }
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+ }
+
+ public void testFlowControlDisabled() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int clientWindowSize = -1;
+
+ final int numMessages = 10;
+
+ final int msgSize = 100;
+
+ List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+
+ EasyMock.expect(msg.isExpired()).andStubReturn(true);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ session.delivered((long)i, true);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(msgSize);
+ }
+
+ final long targetID = 120912;
+ final long sessionTargetID = 12348;
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, targetID, 67565, clientWindowSize, false, rc, pd, executor, sessionTargetID);
+
+ for (ClientMessage msg: msgs)
+ {
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages, consumer.getBufferSize());
+
+ for (ClientMessage msg: msgs)
+ {
+ assertNull(consumer.receiveImmediate());
+ }
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+ }
+
// Private -----------------------------------------------------------------------------------------------------------
private void testConstructor(final long targetID, final long clientTargetID,
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-07-07 11:43:18 UTC (rev 4641)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-07-07 12:19:15 UTC (rev 4642)
@@ -33,6 +33,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -258,6 +260,39 @@
return null;
}
+
+ public static class DirectExecutorService extends AbstractExecutorService
+ {
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return false;
+ }
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public void shutdown()
+ {
+ }
+
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ return null;
+ }
+
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list