[jboss-cvs] JBoss Messaging SVN: r4574 - in trunk: tests/src/org/jboss/messaging/tests/unit/core/server/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 25 08:23:54 EDT 2008
Author: ataylor
Date: 2008-06-25 08:23:54 -0400 (Wed, 25 Jun 2008)
New Revision: 4574
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConnectionPacketHandlerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerPacketHandlerTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
new tests
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-06-25 06:36:16 UTC (rev 4573)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-06-25 12:23:54 UTC (rev 4574)
@@ -22,24 +22,19 @@
package org.jboss.messaging.core.server.impl;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.server.HandleStatus;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Concrete implementation of a ClientConsumer.
*
@@ -90,7 +85,7 @@
// Constructors ---------------------------------------------------------------------------------
- ServerConsumerImpl(final ServerSession session, final long clientTargetID,
+ public ServerConsumerImpl(final ServerSession session, final long clientTargetID,
final Queue messageQueue, final boolean noLocal, final Filter filter,
final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
final long connectionID,
@@ -112,7 +107,7 @@
this.connectionID = connectionID;
- this.session = session;
+ this.session = session;
this.started = started;
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConnectionPacketHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConnectionPacketHandlerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConnectionPacketHandlerTest.java 2008-06-25 12:23:54 UTC (rev 4574)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.ServerConnection;
+import org.jboss.messaging.core.server.impl.ServerConnectionPacketHandler;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerConnectionPacketHandlerTest extends UnitTestCase
+{
+ public void testGetId()
+ {
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ expect(connection.getID()).andReturn(12345l);
+ replay(connection);
+ assertEquals(handler.getID(), 12345l);
+ verify(connection);
+ }
+
+ public void testCreateSession() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(true, true, true);
+
+ expect(connection.createSession(true, true, true, returner)).andAnswer(new IAnswer<ConnectionCreateSessionResponseMessage>()
+ {
+ public ConnectionCreateSessionResponseMessage answer() throws Throwable
+ {
+ boolean isXa = (Boolean) getCurrentArguments()[0];
+ boolean isSends = (Boolean) getCurrentArguments()[1];
+ boolean isAcks = (Boolean) getCurrentArguments()[2];
+ PacketReturner packetReturner = (PacketReturner) getCurrentArguments()[3];
+ assertTrue(isXa);
+ assertTrue(isSends);
+ assertTrue(isAcks);
+ assertEquals(returner, packetReturner);
+ return new ConnectionCreateSessionResponseMessage(12345);
+ }
+ });
+ replay(connection, returner);
+ handler.doHandle(request, returner);
+ verify(connection, returner);
+ }
+
+ public void testCreateSession2() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, false, false);
+
+ expect(connection.createSession(false, false, false, returner)).andAnswer(new IAnswer<ConnectionCreateSessionResponseMessage>()
+ {
+ public ConnectionCreateSessionResponseMessage answer() throws Throwable
+ {
+ boolean isXa = (Boolean) getCurrentArguments()[0];
+ boolean isSends = (Boolean) getCurrentArguments()[1];
+ boolean isAcks = (Boolean) getCurrentArguments()[2];
+ PacketReturner packetReturner = (PacketReturner) getCurrentArguments()[3];
+ assertFalse(isXa);
+ assertFalse(isSends);
+ assertFalse(isAcks);
+ assertEquals(returner, packetReturner);
+ return new ConnectionCreateSessionResponseMessage(12345);
+ }
+ });
+ replay(connection, returner);
+ handler.doHandle(request, returner);
+ verify(connection, returner);
+ }
+
+ public void testConnectionStart() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ Packet packet = new PacketImpl(PacketImpl.CONN_START);
+ connection.start();
+ replay(connection, returner);
+ handler.doHandle(packet, returner);
+ verify(connection, returner);
+ }
+
+ public void testConnectionStop() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ Packet packet = new PacketImpl(PacketImpl.CONN_STOP);
+ connection.stop();
+ replay(connection, returner);
+ handler.doHandle(packet, returner);
+ verify(connection, returner);
+ }
+
+ public void testConnectionClose() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ Packet packet = new PacketImpl(PacketImpl.CLOSE);
+ packet.setResponseTargetID(123);
+ connection.close();
+ replay(connection, returner);
+ assertNotNull(handler.doHandle(packet, returner));
+ verify(connection, returner);
+ }
+
+ public void testUnsupportedPacket() throws Exception
+ {
+ final PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConnection connection = createStrictMock(ServerConnection.class);
+ ServerConnectionPacketHandler handler = new ServerConnectionPacketHandler(connection);
+ Packet packet = EasyMock.createStrictMock(Packet.class);
+ expect(packet.getType()).andReturn(Byte.MAX_VALUE);
+ replay(connection, returner);
+
+ try
+ {
+ handler.doHandle(packet, returner);
+ fail("should throw exception");
+ }
+ catch (Exception e)
+ {
+ MessagingException messagingException = (MessagingException) e;
+ assertEquals(messagingException.getCode(), MessagingException.UNSUPPORTED_PACKET);
+ }
+
+ verify(connection, returner);
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java 2008-06-25 12:23:54 UTC (rev 4574)
@@ -0,0 +1,338 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import static org.easymock.EasyMock.*;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerConsumerImplTest extends UnitTestCase
+{
+ private ServerSession serverSession;
+ private Queue queue;
+ private Filter filter;
+ private StorageManager storageManager;
+ private HierarchicalRepository<QueueSettings> repository;
+ private PostOffice postOffice;
+ private PacketDispatcher dispatcher;
+
+ public void testStarted()
+ {
+ ServerConsumerImpl consumer = create(1, 999l, false, false, false);
+ serverSession.promptDelivery(queue);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ consumer.setStarted(true);
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testClose() throws Exception
+ {
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ expect(queue.removeConsumer(consumer)).andReturn(true);
+ serverSession.removeConsumer(consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ consumer.close();
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testCloseAutDeleteQueue() throws Exception
+ {
+ ServerConsumerImpl consumer = create(1, 999l, true, true, false);
+ expect(queue.removeConsumer(consumer)).andReturn(true);
+ expect(queue.getConsumerCount()).andReturn(0);
+ SimpleString qName = new SimpleString("testQ");
+ expect(queue.getName()).andStubReturn(qName);
+ expect(postOffice.removeBinding(qName)).andReturn(null);
+ expect(queue.isDurable()).andReturn(true);
+ queue.deleteAllReferences(storageManager);
+ serverSession.removeConsumer(consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ consumer.close();
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleNoAvailableCredits() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ assertEquals(HandleStatus.BUSY, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleExpiredMessage() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(true);
+ messageReference.expire(storageManager, postOffice, repository);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleOnInstartedConsumer() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, false, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.BUSY, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleOnNoMatch() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(false);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.NO_MATCH, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleDelivery() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference, consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleDeliveryNoLocalSameConnection() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, true);
+ expect(messageReference.getQueue()).andStubReturn(queue);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getConnectionID()).andReturn(2l);
+ expect(storageManager.generateTransactionID()).andReturn(555l);
+ expect(message.isDurable()).andReturn(false);
+ queue.referenceAcknowledged(messageReference);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandleDeliveryNoLocalDiffConnection() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, true);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getConnectionID()).andReturn(3l);
+ expect(message.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference, consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandlExceptionOneDeliveryRemovesConsumer() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference, consumer);
+ expectLastCall().andThrow(new RuntimeException());
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandle2DeliveriesFirstUsesTokens() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ MessageReference messageReference2 = createStrictMock(MessageReference.class);
+ ServerMessage message2 = createStrictMock(ServerMessage.class);
+ expect(messageReference2.getMessage()).andStubReturn(message2);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference, consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ assertEquals(HandleStatus.BUSY, consumer.handle(messageReference2));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+ public void testHandle2DeliveriesFirstUsesTokensAddTokenThenRedeliver() throws Exception
+ {
+ MessageReference messageReference = createStrictMock(MessageReference.class);
+ ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(messageReference.getMessage()).andStubReturn(message);
+ MessageReference messageReference2 = createStrictMock(MessageReference.class);
+ ServerMessage message2 = createStrictMock(ServerMessage.class);
+ expect(messageReference2.getMessage()).andStubReturn(message2);
+ ServerConsumerImpl consumer = create(1, 999l, false, true, false);
+ serverSession.promptDelivery(queue);
+ expect(message.isExpired()).andReturn(false);
+ expect(filter.match(message)).andReturn(true);
+ expect(message.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference, consumer);
+ serverSession.promptDelivery(queue);
+ expect(message2.isExpired()).andReturn(false);
+ expect(filter.match(message2)).andReturn(true);
+ expect(message2.getEncodeSize()).andReturn(1);
+ serverSession.handleDelivery(messageReference2, consumer);
+ replay(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message, messageReference2, message2);
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference));
+ assertEquals(HandleStatus.BUSY, consumer.handle(messageReference2));
+ consumer.receiveCredits(1);
+ assertEquals(HandleStatus.HANDLED, consumer.handle(messageReference2));
+ verify(serverSession, queue, filter, storageManager, repository, postOffice, dispatcher, messageReference, message, messageReference2, message2);
+ assertEquals(999l, consumer.getID());
+ assertEquals(1, consumer.getClientTargetID());
+ assertEquals(queue, consumer.getQueue());
+ }
+
+
+ private ServerConsumerImpl create(int clientId, long consumerId, boolean autoDeleteQueue, boolean started, boolean noLocal)
+ {
+ serverSession = createStrictMock(ServerSession.class);
+ queue = createStrictMock(Queue.class);
+ filter = createStrictMock(Filter.class);
+ storageManager = createStrictMock(StorageManager.class);
+ repository = createStrictMock(HierarchicalRepository.class);
+ postOffice = createStrictMock(PostOffice.class);
+ dispatcher = createStrictMock(PacketDispatcher.class);
+ expect(dispatcher.generateID()).andReturn(consumerId);
+ queue.addConsumer((Consumer) anyObject());
+ replay(dispatcher, queue);
+ ServerConsumerImpl consumer = new ServerConsumerImpl(serverSession, clientId, queue, noLocal, filter, autoDeleteQueue, true, 0, 2, started, storageManager,
+ repository, postOffice, dispatcher);
+ verify(dispatcher, queue);
+ reset(dispatcher, queue);
+ return consumer;
+ }
+
+ class promptDeliveryAnswer implements IAnswer
+ {
+ volatile boolean delivering;
+ private CountDownLatch countDownLatch;
+
+ public promptDeliveryAnswer(CountDownLatch countDownLatch)
+ {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public Object answer() throws Throwable
+ {
+ countDownLatch.await(10000, TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerPacketHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerPacketHandlerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerPacketHandlerTest.java 2008-06-25 12:23:54 UTC (rev 4574)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.unit.core.server.impl;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.ServerConsumer;
+import org.jboss.messaging.core.server.impl.ServerConsumerPacketHandler;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerConsumerPacketHandlerTest extends UnitTestCase
+{
+ public void testGetId()
+ {
+ ServerConsumer consumer = createStrictMock(ServerConsumer.class);
+ ServerConsumerPacketHandler handler = new ServerConsumerPacketHandler(consumer);
+ expect(consumer.getID()).andReturn(9999l);
+ replay(consumer);
+ assertEquals(9999l, handler.getID());
+ verify(consumer);
+ }
+
+ public void testConsumerFlowHandle() throws Exception
+ {
+ ServerConsumer consumer = createStrictMock(ServerConsumer.class);
+ PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConsumerPacketHandler handler = new ServerConsumerPacketHandler(consumer);
+ ConsumerFlowCreditMessage message = new ConsumerFlowCreditMessage(100);
+ consumer.receiveCredits(100);
+ replay(consumer, returner);
+ handler.doHandle(message, returner);
+ verify(consumer, returner);
+ }
+
+ public void testConsumerClose() throws Exception
+ {
+ ServerConsumer consumer = createStrictMock(ServerConsumer.class);
+ PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConsumerPacketHandler handler = new ServerConsumerPacketHandler(consumer);
+ Packet message = new PacketImpl(PacketImpl.CLOSE);
+ message.setResponseTargetID(123);
+ consumer.close();
+ replay(consumer, returner);
+ assertNotNull(handler.doHandle(message, returner));
+ verify(consumer, returner);
+ }
+
+ public void testUnsupportedPacket() throws Exception
+ {
+ ServerConsumer consumer = createStrictMock(ServerConsumer.class);
+ PacketReturner returner = createStrictMock(PacketReturner.class);
+ ServerConsumerPacketHandler handler = new ServerConsumerPacketHandler(consumer);
+ Packet packet = EasyMock.createStrictMock(Packet.class);
+ expect(packet.getType()).andReturn(Byte.MAX_VALUE);
+ replay(consumer, returner);
+
+ try
+ {
+ handler.doHandle(packet, returner);
+ fail("should throw exception");
+ }
+ catch (Exception e)
+ {
+ MessagingException messagingException = (MessagingException) e;
+ assertEquals(messagingException.getCode(), MessagingException.UNSUPPORTED_PACKET);
+ }
+
+ verify(consumer, returner);
+ }
+}
More information about the jboss-cvs-commits
mailing list