[jboss-cvs] JBoss Messaging SVN: r4634 - in trunk: tests/src/org/jboss/messaging/tests/unit/core/client/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 4 11:51:41 EDT 2008
Author: timfox
Date: 2008-07-04 11:51:41 -0400 (Fri, 04 Jul 2008)
New Revision: 4634
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More client consumer tests
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-07-04 11:43:34 UTC (rev 4633)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-07-04 15:51:41 UTC (rev 4634)
@@ -305,7 +305,7 @@
if (ignoreDeliveryMark >= 0)
{
long delID = message.getDeliveryID();
-
+
if (delID > ignoreDeliveryMark)
{
// Ignore - the session is recovering and these are inflight
@@ -320,7 +320,7 @@
}
else
{
- throw new IllegalStateException("Invalid delivery serverTargetID " + delID);
+ throw new IllegalStateException("Invalid delivery id " + delID);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java 2008-07-04 11:43:34 UTC (rev 4633)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConsumerImplTest.java 2008-07-04 15:51:41 UTC (rev 4634)
@@ -22,6 +22,12 @@
package org.jboss.messaging.tests.unit.core.client.impl;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.easymock.EasyMock;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
@@ -29,16 +35,13 @@
import org.jboss.messaging.core.client.impl.ClientConsumerImpl;
import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
import org.jboss.messaging.core.client.impl.ClientSessionInternal;
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.tests.util.UnitTestCase;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
/**
*
* A ClientConsumerImplTest
@@ -102,13 +105,47 @@
assertEquals(numMessages, consumer.getBufferSize());
}
+ private class DirectExecutorService extends AbstractExecutorService
+ {
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ return false;
+ }
+
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public void shutdown()
+ {
+ }
+
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ return null;
+ }
+
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+ }
+
public void testHandleMessageWithNonDirectHandler() throws Exception
{
ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
- ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ ExecutorService executor = new DirectExecutorService();
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+ MessageHandler handler = EasyMock.createStrictMock(MessageHandler.class);
final int numMessages = 10;
@@ -122,31 +159,34 @@
EasyMock.expect(msg.getPriority()).andReturn((byte)4); //default priority
- executor.execute(EasyMock.isA(Runnable.class));
+ EasyMock.expect(msg.isExpired()).andReturn(false);
+
+ EasyMock.expect(msg.getDeliveryID()).andReturn((long)i);
+
+ session.delivered((long)i, false);
+
+ EasyMock.expect(msg.getEncodeSize()).andReturn(1);
+
+ handler.onMessage(msg);
}
- EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(session, connection, rc, pd, handler);
EasyMock.replay(msgs.toArray());
ClientConsumerInternal consumer =
new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
- consumer.setMessageHandler(new MessageHandler()
- {
- public void onMessage(final ClientMessage message)
- {
- }
- });
-
+ consumer.setMessageHandler(handler);
+
for (ClientMessage msg: msgs)
{
consumer.handleMessage(msg);
}
- EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(session, connection, rc, pd, handler);
EasyMock.verify(msgs.toArray());
- assertEquals(numMessages, consumer.getBufferSize());
+ assertEquals(0, consumer.getBufferSize());
}
public void testHandleMessageWithDirectHandler() throws Exception
@@ -469,6 +509,87 @@
EasyMock.verify(session, connection, rc, executor, pd);
EasyMock.verify(msgs.toArray());
}
+
+ public void testReceiveWithHandler() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ public void onMessage(ClientMessage msg)
+ {
+ }
+ };
+
+ consumer.setMessageHandler(handler);
+
+ try
+ {
+ consumer.receive();
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.ILLEGAL_STATE, e.getCode());
+ }
+ }
+
+ public void testSetHandlerWhileReceiving() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ public void onMessage(ClientMessage msg)
+ {
+ }
+ };
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ consumer.receive(1000);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+
+ t.start();
+
+ Thread.sleep(100);
+
+ try
+ {
+ consumer.setMessageHandler(handler);
+
+ fail("Should throw exception");
+ }
+ catch (MessagingException e)
+ {
+ assertEquals(MessagingException.ILLEGAL_STATE, e.getCode());
+ }
+ finally
+ {
+ t.interrupt();
+ }
+ }
public void testClose() throws Exception
{
@@ -489,6 +610,15 @@
consumer.close();
EasyMock.verify(session, connection, rc, executor, pd);
+
+ //Closing again should do nothing
+
+ EasyMock.reset(session, connection, rc, executor, pd);
+ EasyMock.replay(session, connection, rc, executor, pd);
+
+ consumer.close();
+
+ EasyMock.verify(session, connection, rc, executor, pd);
}
@@ -511,6 +641,127 @@
EasyMock.verify(session, connection, rc, executor, pd);
}
+ public void testHandleOnRecover() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ final int numMessages = 10;
+
+ List<ClientMessage> msgs = new ArrayList<ClientMessage>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = EasyMock.createStrictMock(ClientMessage.class);
+
+ msgs.add(msg);
+ }
+
+ //Handle the first five
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+ }
+
+ //Then recover called so next 5 get rejected
+
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+ }
+
+ //Then all ten get redelivered
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ EasyMock.expect(msg.getDeliveryID()).andStubReturn((long)i);
+
+ EasyMock.expect(msg.getPriority()).andStubReturn((byte)4); //default priority
+ }
+
+ EasyMock.replay(session, connection, rc, executor, pd);
+ EasyMock.replay(msgs.toArray());
+
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, 67565, 787, false, rc, pd, executor, 878787);
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages / 2, consumer.getBufferSize());
+
+ consumer.recover(0);
+
+ assertEquals(0, consumer.getBufferSize());
+
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(0, consumer.getBufferSize());
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = msgs.get(i);
+
+ consumer.handleMessage(msg);
+ }
+
+ assertEquals(numMessages, consumer.getBufferSize());
+
+ EasyMock.verify(session, connection, rc, executor, pd);
+ EasyMock.verify(msgs.toArray());
+ }
+
+ public void testHandleOnClose() throws Exception
+ {
+ ClientSessionInternal session = EasyMock.createStrictMock(ClientSessionInternal.class);
+ ClientConnectionInternal connection = EasyMock.createStrictMock(ClientConnectionInternal.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+
+ ClientMessage msg1 = EasyMock.createStrictMock(ClientMessage.class);
+ ClientMessage msg2 = EasyMock.createStrictMock(ClientMessage.class);
+
+ final long clientTargetID = 120912;
+ ClientConsumerInternal consumer =
+ new ClientConsumerImpl(session, 675765, clientTargetID, 787, false, rc, pd, executor, 878787);
+
+ pd.unregister(clientTargetID);
+ session.removeConsumer(consumer);
+ EasyMock.expect(rc.sendBlocking(675765, 878787, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
+ EasyMock.replay(session, connection, rc, executor, pd, msg1, msg2);
+
+ consumer.close();
+
+ consumer.handleMessage(msg1);
+ consumer.handleMessage(msg2);
+
+ EasyMock.verify(session, connection, rc, executor, pd, msg1, msg2);
+
+ assertEquals(0, consumer.getBufferSize());
+ }
+
// Private -----------------------------------------------------------------------------------------------------------
private void testConstructor(final long targetID, final long clientTargetID,
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-07-04 11:43:34 UTC (rev 4633)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-07-04 15:51:41 UTC (rev 4634)
@@ -796,9 +796,7 @@
EasyMock.verify(conn, rc, cf, pd, prod1, prod2, cons1, cons2, browser1, browser2);
}
-
-
-
+
public void testClose() throws Exception
{
testClose(true);
More information about the jboss-cvs-commits
mailing list