[jboss-cvs] JBoss Messaging SVN: r6077 - trunk/tests/src/org/jboss/messaging/tests/integration/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 13 10:21:50 EDT 2009
Author: ataylor
Date: 2009-03-13 10:21:50 -0400 (Fri, 13 Mar 2009)
New Revision: 6077
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
some end to end tests - more to follow
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-13 13:19:02 UTC (rev 6076)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-13 14:21:50 UTC (rev 6077)
@@ -36,6 +36,7 @@
import org.jboss.messaging.utils.SimpleString;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -43,14 +44,21 @@
public class ClientEndToEndTest extends ServiceTestBase
{
public final SimpleString addressA = new SimpleString("addressA");
+
public final SimpleString queueA = new SimpleString("queueA");
- private final SimpleString groupTestQ = new SimpleString("testGroupQueue");;
+ public final SimpleString queueB = new SimpleString("queueB");
+
+ public final SimpleString queueC = new SimpleString("queueC");
+
+ private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
+
/*ackbatchSize tests*/
/*
* tests that wed don't acknowledge until the correct ackBatchSize is reached
* */
+
public void testAckBatchSize() throws Exception
{
MessagingService messagingService = createService(false);
@@ -70,14 +78,14 @@
ClientSession session = cf.createSession(false, true, true);
session.createQueue(addressA, queueA, false);
ClientProducer cp = sendSession.createProducer(addressA);
- for(int i = 0 ; i < numMessages; i ++)
+ for (int i = 0; i < numMessages; i++)
{
cp.send(sendSession.createClientMessage(false));
}
ClientConsumer consumer = session.createConsumer(queueA);
session.start();
- for(int i = 0; i < numMessages - 1; i++)
+ for (int i = 0; i < numMessages - 1; i++)
{
ClientMessage m = consumer.receive(5000);
m.acknowledge();
@@ -93,7 +101,7 @@
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
@@ -120,7 +128,7 @@
ClientSession session = cf.createSession(false, true, true);
session.createQueue(addressA, queueA, false);
ClientProducer cp = sendSession.createProducer(addressA);
- for(int i = 0 ; i < numMessages; i ++)
+ for (int i = 0; i < numMessages; i++)
{
cp.send(sendSession.createClientMessage(false));
}
@@ -129,12 +137,12 @@
session.start();
Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
ClientMessage[] messages = new ClientMessage[numMessages];
- for(int i = 0; i < numMessages; i++)
+ for (int i = 0; i < numMessages; i++)
{
messages[i] = consumer.receive(5000);
assertNotNull(messages[i]);
}
- for(int i = 0; i < numMessages; i++)
+ for (int i = 0; i < numMessages; i++)
{
messages[i].acknowledge();
assertEquals(numMessages - i - 1, q.getDeliveringCount());
@@ -144,7 +152,7 @@
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
@@ -156,6 +164,7 @@
/*
* tests when the autogroupid is set only 1 consumer (out of 2) gets all the messages from a single producer
* */
+
public void testGroupIdAutomaticallySet() throws Exception
{
MessagingService messagingService = createService(false);
@@ -163,7 +172,7 @@
{
AddressSettings qs = new AddressSettings();
qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
messagingService.start();
ClientSessionFactory sf = createInVMFactory();
@@ -201,7 +210,7 @@
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
@@ -266,7 +275,7 @@
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
@@ -285,7 +294,7 @@
AddressSettings qs = new AddressSettings();
qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
- messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
messagingService.start();
ClientSessionFactory sf = createInVMFactory();
@@ -323,7 +332,7 @@
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
@@ -331,6 +340,817 @@
}
+ /*
+ * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
+ * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
+ * to its window size
+ * */
+ public void testSendWindowSize() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ ClientSessionFactory cf = createInVMFactory();
+ try
+ {
+ messagingService.start();
+ cf.setBlockOnNonPersistentSend(true);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession receiveSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
+ ClientMessage cm = sendSession.createClientMessage(false);
+ cm.setDestination(addressA);
+ int encodeSize = cm.getEncodeSize();
+ int numMessage = 100;
+ cf.setConsumerWindowSize(numMessage * encodeSize);
+ ClientSession session = cf.createSession(false, true, true);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ session.start();
+ receiveSession.start();
+ for (int i = 0; i < numMessage * 4; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+
+ for (int i = 0; i < numMessage * 2; i++)
+ {
+ ClientMessage m = receivingConsumer.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ receiveSession.close();
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessage, q.getDeliveringCount());
+
+ session.close();
+ sendSession.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToMultipleQueues() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ sendSession.createQueue(addressA, queueB, false);
+ sendSession.createQueue(addressA, queueC, false);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(sendSession.createClientMessage(false));
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ ClientConsumer c2 = session.createConsumer(queueB);
+ ClientConsumer c3 = session.createConsumer(queueC);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ c2.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ c3.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ assertNull(c2.receiveImmediate());
+ assertNull(c3.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToSingleNonDurableQueue() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(sendSession.createClientMessage(false));
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToSingleDurableQueue() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, true);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(sendSession.createClientMessage(false));
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToSingleQueueWithFilter() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage clientMessage = sendSession.createClientMessage(false);
+ clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+ p.send(clientMessage);
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToMultipleQueueWithFilters() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+ sendSession.createQueue(addressA, queueB, new SimpleString("x = 1"), false, false);
+ sendSession.createQueue(addressA, queueC, new SimpleString("b = false"), false, false);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage clientMessage = sendSession.createClientMessage(false);
+ if (i % 3 == 0)
+ {
+ clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+ }
+ else if (i % 3 == 1)
+ {
+ clientMessage.putIntProperty(new SimpleString("x"), 1);
+ }
+ else
+ {
+ clientMessage.putBooleanProperty(new SimpleString("b"), false);
+ }
+ p.send(clientMessage);
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ ClientConsumer c2 = session.createConsumer(queueB);
+ ClientConsumer c3 = session.createConsumer(queueC);
+ session.start();
+ for (int i = 0; i < numMessages / 3; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ m = c2.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ m = c3.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ assertNull(c2.receiveImmediate());
+ assertNull(c3.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testRouteToSingleTemporaryQueue() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false, true);
+ int numMessages = 300;
+ ClientProducer p = sendSession.createProducer(addressA);
+ for (int i = 0; i < numMessages; i++)
+ {
+ p.send(sendSession.createClientMessage(false));
+ }
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumer c1 = session.createConsumer(queueA);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage m = c1.receive(5000);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ assertNull(c1.receiveImmediate());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testSendWithCommit() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession session = cf.createSession(false, false, false);
+ session.createQueue(addressA, queueA, false);
+ ClientProducer cp = session.createProducer(addressA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(session.createClientMessage(false));
+ }
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(q.getMessageCount(), 0);
+ session.commit();
+ assertEquals(q.getMessageCount(), numMessages);
+ //now send some more
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(session.createClientMessage(false));
+ }
+ assertEquals(q.getMessageCount(), numMessages);
+ session.commit();
+ assertEquals(q.getMessageCount(), numMessages * 2);
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testSendWithRollback() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession session = cf.createSession(false, false, false);
+ session.createQueue(addressA, queueA, false);
+ ClientProducer cp = session.createProducer(addressA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(session.createClientMessage(false));
+ }
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(q.getMessageCount(), 0);
+ session.rollback();
+ assertEquals(q.getMessageCount(), 0);
+ //now send some more
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(session.createClientMessage(false));
+ }
+ assertEquals(q.getMessageCount(), 0);
+ session.commit();
+ assertEquals(q.getMessageCount(), numMessages);
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testReceiveWithCommit() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession session = cf.createSession(false, false, false);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage cm = cc.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ session.commit();
+ assertEquals(0, q.getDeliveringCount());
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testReceiveWithRollback() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession session = cf.createSession(false, false, false);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage cm = cc.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ session.rollback();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage cm = cc.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ assertEquals(numMessages, q.getDeliveringCount());
+ session.close();
+ sendSession.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testReceiveAckLastMessageOnly() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setAckBatchSize(0);
+ cf.setBlockOnAcknowledge(true);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession session = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ session.start();
+ ClientMessage cm = null;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cm = cc.receive(5000);
+ assertNotNull(cm);
+ }
+ cm.acknowledge();
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+
+ assertEquals(0, q.getDeliveringCount());
+ session.close();
+ sendSession.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testAsyncConsumerNoAck() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession session = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testAsyncConsumerAck() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnAcknowledge(true);
+ cf.setAckBatchSize(0);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ final ClientSession session = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (MessagingException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(0, q.getDeliveringCount());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testAsyncConsumerAckLastMessageOnly() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnAcknowledge(true);
+ cf.setAckBatchSize(0);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ final ClientSession session = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ if (latch.getCount() == 1)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (MessagingException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ }
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(0, q.getDeliveringCount());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testAsyncConsumerCommit() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnAcknowledge(true);
+ cf.setAckBatchSize(0);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ final ClientSession session = cf.createSession(false, true, false);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new MessageHandler()
+ {
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (MessagingException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ assertEquals(numMessages, q.getMessageCount());
+ session.commit();
+ assertEquals(0, q.getDeliveringCount());
+ assertEquals(0, q.getMessageCount());
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ public void testAsyncConsumerRollback() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnAcknowledge(true);
+ cf.setAckBatchSize(0);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ final ClientSession session = cf.createSession(false, true, false);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new ackHandler(session, latch));
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ assertEquals(numMessages, q.getMessageCount());
+ session.rollback();
+ assertEquals(0, q.getDeliveringCount());
+ assertEquals(numMessages, q.getMessageCount());
+ latch = new CountDownLatch(numMessages);
+ cc.setMessageHandler(new ackHandler(session, latch));
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
+
+ /* public void testTempQueueGetsDeleted() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ ClientSession session = cf.createSession(false, true, true);
+ session.createQueue(addressA, queueA, false, true);
+ session.close();
+ assertNull(messagingService.getServer().getPostOffice().getBinding(queueA));
+ }
+ finally
+ {
+ if(messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }*/
+
private static class MyMessageHandler implements MessageHandler
{
volatile int messagesReceived = 0;
@@ -356,4 +1176,36 @@
latch.countDown();
}
}
+ private static class ackHandler implements MessageHandler
+ {
+ private final ClientSession session;
+
+ private final CountDownLatch latch;
+
+ public ackHandler(ClientSession session, CountDownLatch latch)
+ {
+ this.session = session;
+ this.latch = latch;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (MessagingException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ latch.countDown();
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list