[jboss-cvs] JBoss Messaging SVN: r6046 - in trunk/tests/src/org/jboss/messaging/tests/integration: client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 9 09:24:12 EDT 2009
Author: ataylor
Date: 2009-03-09 09:24:11 -0400 (Mon, 09 Mar 2009)
New Revision: 6046
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/client/RedeliveryConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/TransactionDurabilityTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/
Log:
moved tests
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/RedeliveryConsumerTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/RedeliveryConsumerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/RedeliveryConsumerTest.java 2009-03-09 13:24:11 UTC (rev 6046)
@@ -0,0 +1,266 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A RedeliveryConsumerTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Feb 17, 2009 6:06:11 PM
+ *
+ *
+ */
+public class RedeliveryConsumerTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ MessagingService messagingService;
+
+ final SimpleString ADDRESS = new SimpleString("address");
+
+ ClientSessionFactory factory;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRedeliveryMessageStrict() throws Exception
+ {
+ testDedeliveryMessageOnPersistent(true);
+ }
+
+ public void testRedeliveryMessageSimpleCancel() throws Exception
+ {
+ testDedeliveryMessageOnPersistent(false);
+ }
+
+ public void testDeliveryNonPersistent() throws Exception
+ {
+ testDelivery(false);
+ }
+
+ public void testDeliveryPersistent() throws Exception
+ {
+ testDelivery(true);
+ }
+
+ public void testDelivery(final boolean persistent) throws Exception
+ {
+ setUp(true);
+ ClientSession session = factory.createSession(false, false, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ prod.send(createTextMessage(session, Integer.toString(i), persistent));
+ }
+
+ session.commit();
+ session.close();
+
+
+ session = factory.createSession(null, null, false, true, true, true, 0);
+
+ session.start();
+ for (int loopAck = 0; loopAck < 5; loopAck++)
+ {
+ ClientConsumer browser = session.createConsumer(ADDRESS, null, true);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = browser.receive(1000);
+ assertNotNull("element i=" + i + " loopAck = " + loopAck + " was expected", msg);
+ msg.acknowledge();
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+
+ // We don't change the deliveryCounter on Browser, so this should be always 0
+ assertEquals(0, msg.getDeliveryCount());
+ }
+
+ session.commit();
+ browser.close();
+ }
+
+ session.close();
+
+
+
+ session = factory.createSession(false, false, false);
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int loopAck = 0; loopAck < 5; loopAck++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+
+ // No ACK done, so deliveryCount should be always = 1
+ assertEquals(1, msg.getDeliveryCount());
+ }
+ session.rollback();
+ }
+
+ if (persistent)
+ {
+ session.close();
+ messagingService.stop();
+ messagingService.start();
+ session = factory.createSession(false, false, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ }
+
+ for (int loopAck = 1; loopAck <= 5; loopAck++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(Integer.toString(i), getTextMessage(msg));
+ assertEquals(loopAck, msg.getDeliveryCount());
+ }
+ if (loopAck < 5)
+ {
+ if (persistent)
+ {
+ session.close();
+ messagingService.stop();
+ messagingService.start();
+ session = factory.createSession(false, false, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ }
+
+ session.close();
+ }
+
+ protected void testDedeliveryMessageOnPersistent(final boolean strictUpdate) throws Exception
+ {
+ setUp(strictUpdate);
+ ClientSession session = factory.createSession(false, false, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(1000);
+ assertEquals(1, msg.getDeliveryCount());
+ session.stop();
+
+ // if strictUpdate == true, this will simulate a crash, where the server is stopped without closing/rolling back
+ // the session
+ if (!strictUpdate)
+ {
+ // If non Strict, at least rollback/cancel should still update the delivery-counts
+ session.rollback(true);
+ session.close();
+ }
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ session = factory.createSession(false, true, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(2, msg.getDeliveryCount());
+ session.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ /**
+ * @param persistDeliveryCountBeforeDelivery
+ * @throws Exception
+ * @throws MessagingException
+ */
+ private void setUp(final boolean persistDeliveryCountBeforeDelivery) throws Exception, MessagingException
+ {
+ Configuration config = createFileConfig();
+ config.setJournalFileSize(10 * 1024);
+ config.setJournalMinFiles(2);
+ config.setSecurityEnabled(false);
+ config.setPersistDeliveryCountBeforeDelivery(persistDeliveryCountBeforeDelivery);
+
+ messagingService = createService(true, config);
+ messagingService.start();
+
+ factory = createInVMFactory();
+
+ ClientSession session = factory.createSession(false, false, false);
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.close();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (messagingService != null && messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/TransactionDurabilityTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/consumer/TransactionDurabilityTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/TransactionDurabilityTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/TransactionDurabilityTest.java 2009-03-09 13:24:11 UTC (rev 6046)
@@ -0,0 +1,190 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+/**
+ *
+ * A TransactionDurabilityTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 16 Jan 2009 11:00:33
+ *
+ *
+ */
+public class TransactionDurabilityTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(TransactionDurabilityTest.class);
+
+ /*
+ * This tests the following situation:
+ *
+ * (With the old implementation)
+ * Currently when a new persistent message is routed to persistent queues, the message is first stored, then the message is routed.
+ * Let's say it has been routed to two different queues A, B.
+ * Ref R1 gets consumed and acknowledged by transacted session S1, this decrements the ref count and causes an acknowledge record to be written to storage,
+ * transactionally, but it's not committed yet.
+ * Ref R2 then gets consumed and acknowledged by non transacted session S2, this causes a delete record to be written to storage.
+ * R1 then rolls back, and the server is restarted - unfortunatelt since the delete record was written R1 is not ready to be consumed again.
+ *
+ * It's therefore crucial the messages aren't deleted from storage until AFTER any ack records are committed to storage.
+ *
+ *
+ */
+ public void testRolledBackAcknowledgeWithSameMessageAckedByOtherSession() throws Exception
+ {
+ Configuration conf = createDefaultConfig();
+
+ final SimpleString testAddress = new SimpleString("testAddress");
+
+ final SimpleString queue1 = new SimpleString("queue1");
+
+ final SimpleString queue2 = new SimpleString("queue2");
+
+ MessagingService messagingService = createService(true, conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session1 = sf.createSession(false, true, true);
+
+ ClientSession session2 = sf.createSession(false, false, false);
+
+ session1.createQueue(testAddress, queue1, null, true, false);
+
+ session1.createQueue(testAddress, queue2, null, true, false);
+
+ ClientProducer producer = session1.createProducer(testAddress);
+
+ ClientMessage message = session1.createClientMessage(true);
+
+ producer.send(message);
+
+ session1.start();
+
+ session2.start();
+
+ ClientConsumer consumer1 = session1.createConsumer(queue1);
+
+ ClientConsumer consumer2 = session2.createConsumer(queue2);
+
+ ClientMessage m1 = consumer1.receive(1000);
+
+ assertNotNull(m1);
+
+ ClientMessage m2 = consumer2.receive(1000);
+
+ assertNotNull(m2);
+
+ m2.acknowledge();
+
+ //Don't commit session 2
+
+ m1.acknowledge();
+
+ session2.rollback();
+
+ session1.close();
+
+ session2.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ session1 = sf.createSession(false, true, true);
+
+ session2 = sf.createSession(false, true, true);
+
+ session1.start();
+
+ session2.start();
+
+ consumer1 = session1.createConsumer(queue1);
+
+ consumer2 = session2.createConsumer(queue2);
+
+ m1 = consumer1.receive(100);
+
+ assertNull(m1);
+
+ m2 = consumer2.receive(1000);
+
+ assertNotNull(m2);
+
+ m2.acknowledge();
+
+ session1.close();
+
+ session2.close();
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ session1 = sf.createSession(false, true, true);
+
+ session2 = sf.createSession(false, true, true);
+
+ session1.start();
+
+ session2.start();
+
+ consumer1 = session1.createConsumer(queue1);
+
+ consumer2 = session2.createConsumer(queue2);
+
+ m1 = consumer1.receive(100);
+
+ assertNull(m1);
+
+ m2 = consumer2.receive(100);
+
+ assertNull(m2);
+
+ session1.close();
+
+ session2.close();
+
+ messagingService.stop();
+
+ }
+
+}
+
More information about the jboss-cvs-commits
mailing list