[hornetq-commits] JBoss hornetq SVN: r10136 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jan 24 15:40:56 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-24 15:40:55 -0500 (Mon, 24 Jan 2011)
New Revision: 10136
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-5816 / HORNETQ-630 - Paging Ordering after rollback and cancelations
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-23 01:38:06 UTC (rev 10135)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -712,6 +712,8 @@
if (ref.isPaged())
{
pageSubscription.ackTx(tx, (PagedReference)ref);
+
+ getRefsOperation(tx).addAck(ref);
}
else
{
@@ -1423,6 +1425,12 @@
return true;
}
}
+
+ /** Used on testing only **/
+ public int getNumberOfReferences()
+ {
+ return messageReferences.size();
+ }
private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-01-23 01:38:06 UTC (rev 10135)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -563,11 +563,11 @@
if (autoCommitAcks || tx == null)
{
- ref.acknowledge();
+ ref.getQueue().acknowledge(ref);
}
else
{
- ref.acknowledge(tx);
+ ref.getQueue().acknowledge(tx, ref);
}
}
while (ref.getMessage().getMessageID() != messageID);
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-24 20:40:55 UTC (rev 10136)
@@ -0,0 +1,539 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingOrderTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingOrderTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private ServerLocator locator;
+
+ public PagingOrderTest(final String name)
+ {
+ super(name);
+ }
+
+ public PagingOrderTest()
+ {
+ super();
+ }
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingTest.class);
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ locator = createInVMNonHALocator();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+
+ super.tearDown();
+ }
+
+ public void testOrder1() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+
+ if (i < 100)
+ {
+ System.out.println("Acking " + i);
+ // Do not consume the last one so we could restart
+ message.acknowledge();
+ }
+ }
+
+ session.commit();
+
+ for (ServerSession sessionServer : server.getSessions())
+ {
+ sessionServer.close(true);
+ }
+
+ OperationContextImpl.getContext().waitCompletion();
+
+ ((ClientSessionFactoryImpl)sf).stopPingingAfterOne();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 100; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderOverRollback() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 3000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ QueueImpl queue = (QueueImpl)server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ System.out.println("number of refs " + queue.getNumberOfReferences());
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("msg = " + message.getIntProperty("id"));
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderOverRollback2() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 200;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(0);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ QueueImpl queue = (QueueImpl)server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ // number of references without paging
+ int numberOfRefs = queue.getNumberOfReferences();
+
+ // consume all non-paged references
+ for (int ref = 0; ref < numberOfRefs; ref++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ int msgIDRolledBack = msg.getIntProperty("id").intValue();
+ msg.acknowledge();
+
+ session.rollback();
+
+ msg = consumer.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msgIDRolledBack, msg.getIntProperty("id").intValue());
+
+ session.rollback();
+
+ session.close();
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(0);
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = msgIDRolledBack; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+
+ locator.close();
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the hornetq-commits
mailing list