[jboss-cvs] JBoss Messaging SVN: r6315 - in trunk: src/main/org/jboss/messaging/core/deployers/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 6 08:10:13 EDT 2009
Author: timfox
Date: 2009-04-06 08:10:13 -0400 (Mon, 06 Apr 2009)
New Revision: 6315
Added:
trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java
Modified:
trunk/src/config/jbm-jms.xml
trunk/src/config/jbm-queues.xml
trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java
Log:
renamed solo queue to LVQ (last value queue)
Modified: trunk/src/config/jbm-jms.xml
===================================================================
--- trunk/src/config/jbm-jms.xml 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/config/jbm-jms.xml 2009-04-06 12:10:13 UTC (rev 6315)
@@ -122,9 +122,6 @@
<topic name="testDurableTopic">
<entry name="/topic/testDurableTopic"/>
</topic>
- <topic name="testSoloTopic">
- <entry name="/topic/testSoloTopic"/>
- </topic>
<queue name="testQueue">
<entry name="/queue/testQueue"/>
</queue>
Modified: trunk/src/config/jbm-queues.xml
===================================================================
--- trunk/src/config/jbm-queues.xml 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/config/jbm-queues.xml 2009-04-06 12:10:13 UTC (rev 6315)
@@ -123,10 +123,6 @@
<distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
</address-settings>
- <address-settings match="topicjms.testSoloTopic">
- <solo-queue>true</solo-queue>
- </address-settings>
-
<!--default for catch all-->
<address-settings match="#">
<clustered>false</clustered>
Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -53,7 +53,7 @@
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
- private static final String SOLO_MESSAGE_NODE_NAME = "solo-queue";
+ private static final String LVQ_NODE_NAME = "last-value-queue";
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -137,9 +137,9 @@
{
addressSettings.setDropMessagesWhenFull(Boolean.valueOf(child.getTextContent().trim()));
}
- else if (SOLO_MESSAGE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+ else if (LVQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
- addressSettings.setSoloQueue(Boolean.valueOf(child.getTextContent().trim()));
+ addressSettings.setLastValueQueue(Boolean.valueOf(child.getTextContent().trim()));
}
else if (MAX_DELIVERY_ATTEMPTS.equalsIgnoreCase(child.getNodeName()))
{
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -72,7 +72,7 @@
public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_JBM_FROM_CLUSTER");
- public static final SimpleString HDR_SOLE_MESSAGE = new SimpleString("_JBM_SOLO_MESSAGE");
+ public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_JBM_LVQ_NAME");
public static final byte OBJECT_TYPE = 2;
Copied: trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java (from rev 6312, trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,293 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
+ * In other words it only retains the last value
+ * This is useful for example, for stock prices, where you're only interested in the latest value
+ * for a particular stock
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LastValueQueue extends QueueImpl
+{
+ private static final Logger log = Logger.getLogger(LastValueQueue.class);
+
+ private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+
+ private final PagingManager pagingManager;
+
+ private final StorageManager storageManager;
+
+ public LastValueQueue(final long persistenceID,
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final boolean durable,
+ final boolean temporary,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ {
+ super(persistenceID,
+ address,
+ name,
+ filter,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository);
+ this.pagingManager = postOffice.getPagingManager();
+ this.storageManager = storageManager;
+ }
+
+ public void route(final ServerMessage message, final Transaction tx) throws Exception
+ {
+ SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ if (prop != null)
+ {
+ synchronized (map)
+ {
+ ServerMessage msg = map.put(prop, message);
+ // if an older message existed then we discard it
+ if (msg != null)
+ {
+ MessageReference ref;
+ if (tx != null)
+ {
+ discardMessage(msg.getMessageID(), tx);
+ }
+ else
+ {
+ ref = removeReferenceWithID(msg.getMessageID());
+ if (ref != null)
+ {
+ discardMessage(ref, tx);
+ }
+ }
+
+ }
+ }
+ }
+ super.route(message, tx);
+ }
+
+ public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
+ {
+ SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ if (prop != null)
+ {
+ synchronized (map)
+ {
+ ServerMessage msg = map.put(prop, message);
+ if (msg != null)
+ {
+ if (tx != null)
+ {
+ rediscardMessage(msg.getMessageID(), tx);
+ }
+ else
+ {
+ MessageReference ref = removeReferenceWithID(msg.getMessageID());
+ rediscardMessage(ref);
+ }
+ }
+ }
+ }
+ return super.reroute(message, tx);
+ }
+
+ public void acknowledge(final MessageReference ref) throws Exception
+ {
+ super.acknowledge(ref);
+ SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ if (prop != null)
+ {
+ synchronized (map)
+ {
+ ServerMessage serverMessage = map.get(prop);
+ if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
+ {
+ map.remove(prop);
+ }
+ }
+ }
+ }
+
+ public void cancel(final Transaction tx, final MessageReference ref) throws Exception
+ {
+ SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ if (prop != null)
+ {
+ synchronized (map)
+ {
+ ServerMessage msg = map.get(prop);
+ if (msg.getMessageID() == ref.getMessage().getMessageID())
+ {
+ super.cancel(tx, ref);
+ }
+ else
+ {
+ discardMessage(ref, tx);
+ }
+ }
+ }
+ else
+ {
+ super.cancel(tx, ref);
+ }
+ }
+
+ void postRollback(final LinkedList<MessageReference> refs) throws Exception
+ {
+ List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
+ List<SimpleString> refsToClear = new ArrayList<SimpleString>();
+ synchronized (map)
+ {
+ for (MessageReference ref : refs)
+ {
+ SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ if (prop != null)
+ {
+ ServerMessage msg = map.get(prop);
+ if (msg != null)
+ {
+ if (msg.getMessageID() != ref.getMessage().getMessageID())
+ {
+ refsToDiscard.add(ref);
+ }
+ else
+ {
+ refsToClear.add(prop);
+ }
+ }
+ }
+ }
+ for (SimpleString simpleString : refsToClear)
+ {
+ map.remove(simpleString);
+ }
+ }
+ for (MessageReference ref : refsToDiscard)
+ {
+ refs.remove(ref);
+ discardMessage(ref, null);
+ }
+ super.postRollback(refs);
+ }
+
+ final void discardMessage(MessageReference ref, Transaction tx) throws Exception
+ {
+ deliveringCount.decrementAndGet();
+ PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+ store.addSize(-ref.getMemoryEstimate());
+ QueueImpl queue = (QueueImpl)ref.getQueue();
+ ServerMessage msg = ref.getMessage();
+ boolean durableRef = msg.isDurable() && queue.isDurable();
+
+ if (durableRef)
+ {
+ int count = msg.decrementDurableRefCount();
+
+ if (count == 0)
+ {
+ if (tx == null)
+ {
+ storageManager.deleteMessage(msg.getMessageID());
+ }
+ else
+ {
+ storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+ }
+ }
+ }
+ }
+
+ final void discardMessage(Long id, Transaction tx) throws Exception
+ {
+ RefsOperation oper = getRefsOperation(tx);
+ Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ discardMessage(ref, tx);
+ break;
+ }
+ }
+
+ }
+
+ final void rediscardMessage(long id, Transaction tx) throws Exception
+ {
+ RefsOperation oper = getRefsOperation(tx);
+ Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+ while (iterator.hasNext())
+ {
+ MessageReference ref = iterator.next();
+
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iterator.remove();
+ rediscardMessage(ref);
+ break;
+ }
+ }
+ }
+
+ final void rediscardMessage(MessageReference ref) throws Exception
+ {
+ deliveringCount.decrementAndGet();
+ PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+ store.addSize(-ref.getMemoryEstimate());
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -78,9 +78,9 @@
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
Queue queue;
- if (addressSettings.isSoloQueue())
+ if (addressSettings.isLastValueQueue())
{
- queue = new SoloQueueImpl(persistenceID,
+ queue = new LastValueQueue(persistenceID,
address,
name,
filter,
Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,290 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A queue that will discard messages if a newer message with the same MessageImpl.HDR_SOLE_MESSAGE property value.
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueImpl extends QueueImpl
-{
- private static final Logger log = Logger.getLogger(SoloQueueImpl.class);
-
- private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
-
- private final PagingManager pagingManager;
-
- private final StorageManager storageManager;
-
- public SoloQueueImpl(final long persistenceID,
- final SimpleString address,
- final SimpleString name,
- final Filter filter,
- final boolean durable,
- final boolean temporary,
- final ScheduledExecutorService scheduledExecutor,
- final PostOffice postOffice,
- final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
- {
- super(persistenceID,
- address,
- name,
- filter,
- durable,
- temporary,
- scheduledExecutor,
- postOffice,
- storageManager,
- addressSettingsRepository);
- this.pagingManager = postOffice.getPagingManager();
- this.storageManager = storageManager;
- }
-
- public void route(final ServerMessage message, final Transaction tx) throws Exception
- {
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_SOLE_MESSAGE);
- if (prop != null)
- {
- synchronized (map)
- {
- ServerMessage msg = map.put(prop, message);
- // if an older message existed then we discard it
- if (msg != null)
- {
- MessageReference ref;
- if (tx != null)
- {
- discardMessage(msg.getMessageID(), tx);
- }
- else
- {
- ref = removeReferenceWithID(msg.getMessageID());
- if (ref != null)
- {
- discardMessage(ref, tx);
- }
- }
-
- }
- }
- }
- super.route(message, tx);
- }
-
- public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_SOLE_MESSAGE);
- if (prop != null)
- {
- synchronized (map)
- {
- ServerMessage msg = map.put(prop, message);
- if (msg != null)
- {
- if (tx != null)
- {
- rediscardMessage(msg.getMessageID(), tx);
- }
- else
- {
- MessageReference ref = removeReferenceWithID(msg.getMessageID());
- rediscardMessage(ref);
- }
- }
- }
- }
- return super.reroute(message, tx);
- }
-
- public void acknowledge(final MessageReference ref) throws Exception
- {
- super.acknowledge(ref);
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
- if (prop != null)
- {
- synchronized (map)
- {
- ServerMessage serverMessage = map.get(prop);
- if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
- {
- map.remove(prop);
- }
- }
- }
- }
-
- public void cancel(final Transaction tx, final MessageReference ref) throws Exception
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
- if (prop != null)
- {
- synchronized (map)
- {
- ServerMessage msg = map.get(prop);
- if (msg.getMessageID() == ref.getMessage().getMessageID())
- {
- super.cancel(tx, ref);
- }
- else
- {
- discardMessage(ref, tx);
- }
- }
- }
- else
- {
- super.cancel(tx, ref);
- }
- }
-
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
- {
- List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
- List<SimpleString> refsToClear = new ArrayList<SimpleString>();
- synchronized (map)
- {
- for (MessageReference ref : refs)
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
- if (prop != null)
- {
- ServerMessage msg = map.get(prop);
- if (msg != null)
- {
- if (msg.getMessageID() != ref.getMessage().getMessageID())
- {
- refsToDiscard.add(ref);
- }
- else
- {
- refsToClear.add(prop);
- }
- }
- }
- }
- for (SimpleString simpleString : refsToClear)
- {
- map.remove(simpleString);
- }
- }
- for (MessageReference ref : refsToDiscard)
- {
- refs.remove(ref);
- discardMessage(ref, null);
- }
- super.postRollback(refs);
- }
-
- final void discardMessage(MessageReference ref, Transaction tx) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- QueueImpl queue = (QueueImpl)ref.getQueue();
- ServerMessage msg = ref.getMessage();
- boolean durableRef = msg.isDurable() && queue.isDurable();
-
- if (durableRef)
- {
- int count = msg.decrementDurableRefCount();
-
- if (count == 0)
- {
- if (tx == null)
- {
- storageManager.deleteMessage(msg.getMessageID());
- }
- else
- {
- storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
- }
- }
- }
- }
-
- final void discardMessage(Long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- discardMessage(ref, tx);
- break;
- }
- }
-
- }
-
- final void rediscardMessage(long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
- {
- MessageReference ref = iterator.next();
-
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- rediscardMessage(ref);
- break;
- }
- }
- }
-
- final void rediscardMessage(MessageReference ref) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -55,7 +55,7 @@
public static final long DEFAULT_REDELIVER_DELAY = 0L;
- public static final boolean DEFAULT_SOLO_QUEUE = false;
+ public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
@@ -77,18 +77,18 @@
private SimpleString expiryAddress = null;
- private Boolean soloQueue = null;
+ private Boolean lastValueQueue = null;
private Long redistributionDelay = null;
- public boolean isSoloQueue()
+ public boolean isLastValueQueue()
{
- return soloQueue != null ? soloQueue : DEFAULT_SOLO_QUEUE;
+ return lastValueQueue != null ? lastValueQueue : DEFAULT_LAST_VALUE_QUEUE;
}
- public void setSoloQueue(final boolean soloQueue)
+ public void setLastValueQueue(final boolean lastValueQueue)
{
- this.soloQueue = soloQueue;
+ this.lastValueQueue = lastValueQueue;
}
public int getPageSizeBytes()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -26,7 +26,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.impl.SoloQueueImpl;
+import org.jboss.messaging.core.server.impl.LastValueQueue;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
@@ -102,13 +102,13 @@
public void testAddressSettingUSed() throws Exception
{
AddressSettings addressSettings = new AddressSettings();
- addressSettings.setSoloQueue(true);
+ addressSettings.setLastValueQueue(true);
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
ClientSession session = createInVMFactory().createSession(false, true, true);
SimpleString filterString = new SimpleString("x=y");
session.createQueue(address, queueName, filterString, false);
Binding binding = server.getPostOffice().getBinding(queueName);
- assertTrue(binding.getBindable() instanceof SoloQueueImpl);
+ assertTrue(binding.getBindable() instanceof LastValueQueue);
session.close();
}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java (from rev 6312, trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LVQRecoveryTest extends ServiceTestBase
+{
+ private MessagingServer server;
+
+ private ClientSession clientSession;
+
+ private SimpleString address = new SimpleString("LVQTestAddress");
+
+ private SimpleString qName1 = new SimpleString("LVQTestQ1");
+
+ private ClientSession clientSessionXa;
+
+ private Configuration configuration;
+
+ private AddressSettings qs;
+
+ public void testMultipleMessagesAfterRecovery() throws Exception
+ {
+ Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+ ClientProducer producer = clientSessionXa.createProducer(address, -1, true, true);
+ SimpleString messageId1 = new SimpleString("SMID1");
+ SimpleString messageId2 = new SimpleString("SMID2");
+ clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ clientSessionXa.end(xid, XAResource.TMSUCCESS);
+ clientSessionXa.prepare(xid);
+ restartServer();
+ clientSessionXa.commit(xid, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ }
+
+ public void testManyMessagesReceivedWithRollback() throws Exception
+ {
+ Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionXa.createConsumer(qName1);
+
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m1.setDurable(true);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m2.setDurable(true);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m3.setDurable(true);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m4.setDurable(true);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m5.setDurable(true);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m6.setDurable(true);
+ clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+ clientSessionXa.start();
+ producer.send(m1);
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m1");
+ producer.send(m2);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m2");
+ producer.send(m3);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m3");
+ producer.send(m4);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m4");
+ producer.send(m5);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m5");
+ producer.send(m6);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m6");
+ clientSessionXa.end(xid, XAResource.TMSUCCESS);
+ clientSessionXa.prepare(xid);
+
+ restartServer();
+ clientSessionXa.rollback(xid);
+ consumer = clientSession.createConsumer(qName1);
+ clientSession.start();
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ m = consumer.receive(1000);
+ assertNull(m);
+ }
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (MessagingException e1)
+ {
+ //
+ }
+ }
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ server = null;
+ clientSession = null;
+
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ clearData();
+ configuration = createConfigForJournal();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ server = createServer(true, configuration);
+ // start the server
+ server.start();
+
+ qs = new AddressSettings();
+ qs.setLastValueQueue(true);
+ server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory.setBlockOnAcknowledge(true);
+ sessionFactory.setAckBatchSize(0);
+ clientSession = sessionFactory.createSession(false, true, true);
+ clientSessionXa = sessionFactory.createSession(true, false, false);
+ clientSession.createQueue(address, qName1, null, true);
+ }
+
+ private void restartServer() throws Exception
+ {
+ server.stop();
+ server = null;
+ server = createServer(true, configuration);
+ server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+ // start the server
+ server.start();
+
+ AddressSettings qs = new AddressSettings();
+ qs.setLastValueQueue(true);
+ server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory.setBlockOnAcknowledge(true);
+ sessionFactory.setAckBatchSize(0);
+ clientSession = sessionFactory.createSession(false, true, true);
+ clientSessionXa = sessionFactory.createSession(true, false, false);
+ }
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java (from rev 6312, trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,584 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LVQTest extends UnitTestCase
+{
+ private MessagingServer server;
+
+ private ClientSession clientSession;
+
+ private ClientSession clientSessionTxReceives;
+
+ private ClientSession clientSessionTxSends;
+
+ private SimpleString address = new SimpleString("LVQTestAddress");
+
+ private SimpleString qName1 = new SimpleString("LVQTestQ1");
+
+ private FakeStorageManager storageManager;
+
+
+ public void testSimple() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ producer.send(m2);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ }
+
+ public void testMultipleMessages() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ SimpleString messageId1 = new SimpleString("SMID1");
+ SimpleString messageId2 = new SimpleString("SMID2");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ }
+
+ public void testFirstMessageReceivedButAckedAfter() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ producer.send(m2);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m1");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ }
+
+ public void testFirstMessageReceivedAndCancelled() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ producer.send(m2);
+ consumer.close();
+ consumer = clientSession.createConsumer(qName1);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ m = consumer.receive(1000);
+ assertNull(m);
+ }
+
+ public void testManyMessagesReceivedAndCancelled() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ clientSession.start();
+ producer.send(m1);
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m1");
+ producer.send(m2);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m2");
+ producer.send(m3);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m3");
+ producer.send(m4);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m4");
+ producer.send(m5);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m5");
+ producer.send(m6);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m6");
+ consumer.close();
+ consumer = clientSession.createConsumer(qName1);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ m = consumer.receive(1000);
+ assertNull(m);
+ }
+
+ public void testSimpleInTx() throws Exception
+ {
+
+ ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ producer.send(m2);
+ clientSessionTxReceives.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ }
+
+ public void testMultipleMessagesInTx() throws Exception
+ {
+ ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ SimpleString messageId1 = new SimpleString("SMID1");
+ SimpleString messageId2 = new SimpleString("SMID2");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ clientSessionTxReceives.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ clientSessionTxReceives.commit();
+ m = consumer.receive(1000);
+ assertNull(m);
+ }
+
+ public void testMultipleMessagesInTxRollback() throws Exception
+ {
+ ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ SimpleString messageId1 = new SimpleString("SMID1");
+ SimpleString messageId2 = new SimpleString("SMID2");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+ producer.send(m1);
+ producer.send(m2);
+ clientSessionTxReceives.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m1");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ producer.send(m3);
+ producer.send(m4);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ clientSessionTxReceives.rollback();
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ }
+
+ public void testMultipleMessagesInTxSend() throws Exception
+ {
+ ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ producer.send(m5);
+ producer.send(m6);
+ clientSessionTxSends.commit();
+ clientSessionTxSends.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ }
+
+ public void testMultipleMessagesPersistedCorrectly() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m1.setDurable(true);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m2.setDurable(true);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m3.setDurable(true);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m4.setDurable(true);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m5.setDurable(true);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m6.setDurable(true);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ producer.send(m5);
+ producer.send(m6);
+ assertEquals(1, storageManager.messageIds.size());
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ assertEquals(0, storageManager.messageIds.size());
+ }
+
+ public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception
+ {
+ ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m1.setDurable(true);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m2.setDurable(true);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m3.setDurable(true);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m4.setDurable(true);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m5.setDurable(true);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m6.setDurable(true);
+ producer.send(m1);
+ producer.send(m2);
+ producer.send(m3);
+ producer.send(m4);
+ producer.send(m5);
+ producer.send(m6);
+ clientSessionTxSends.commit();
+ assertEquals(1, storageManager.messageIds.size());
+ clientSessionTxSends.start();
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ assertEquals(0, storageManager.messageIds.size());
+ }
+
+ public void testMultipleAcksPersistedCorrectly() throws Exception
+ {
+ ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m1.setDurable(true);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m2.setDurable(true);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m3.setDurable(true);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m4.setDurable(true);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m5.setDurable(true);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m6.setDurable(true);
+ clientSession.start();
+ producer.send(m1);
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m1");
+ producer.send(m2);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ producer.send(m3);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ producer.send(m4);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ producer.send(m5);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m5");
+ producer.send(m6);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ assertEquals(6, storageManager.ackIds.size());
+ }
+
+ public void testMultipleAcksPersistedCorrectlyInTx() throws Exception
+ {
+ ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ SimpleString rh = new SimpleString("SMID1");
+ ClientMessage m1 = createTextMessage("m1", clientSession);
+ m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m1.setDurable(true);
+ ClientMessage m2 = createTextMessage("m2", clientSession);
+ m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m2.setDurable(true);
+ ClientMessage m3 = createTextMessage("m3", clientSession);
+ m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m3.setDurable(true);
+ ClientMessage m4 = createTextMessage("m4", clientSession);
+ m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m4.setDurable(true);
+ ClientMessage m5 = createTextMessage("m5", clientSession);
+ m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m5.setDurable(true);
+ ClientMessage m6 = createTextMessage("m6", clientSession);
+ m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+ m6.setDurable(true);
+ clientSessionTxReceives.start();
+ producer.send(m1);
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m1");
+ producer.send(m2);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m2");
+ producer.send(m3);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m3");
+ producer.send(m4);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m4");
+ producer.send(m5);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m5");
+ producer.send(m6);
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().readString(), "m6");
+ clientSessionTxReceives.commit();
+ assertEquals(6, storageManager.ackIds.size());
+ }
+
+
+
+
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (MessagingException e1)
+ {
+ //
+ }
+ }
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ server = null;
+ clientSession = null;
+
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ConfigurationImpl configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ storageManager = new FakeStorageManager();
+ server = Messaging.newMessagingServer(configuration, storageManager);
+ // start the server
+ server.start();
+
+ AddressSettings qs = new AddressSettings();
+ qs.setLastValueQueue(true);
+ server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory.setBlockOnAcknowledge(true);
+ sessionFactory.setAckBatchSize(0);
+ clientSession = sessionFactory.createSession(false, true, true);
+ clientSessionTxReceives = sessionFactory.createSession(false, true, false);
+ clientSessionTxSends = sessionFactory.createSession(false, false, true);
+ clientSession.createQueue(address, qName1, null, true);
+ }
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,236 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueRecoveryTest extends ServiceTestBase
-{
- private MessagingServer server;
-
- private ClientSession clientSession;
-
- private SimpleString address = new SimpleString("SoloQueueTestAddress");
-
- private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
-
- private ClientSession clientSessionXa;
-
- private Configuration configuration;
-
- private AddressSettings qs;
-
- public void testMultipleMessagesAfterRecovery() throws Exception
- {
- Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
- ClientProducer producer = clientSessionXa.createProducer(address, -1, true, true);
- SimpleString messageId1 = new SimpleString("SMID1");
- SimpleString messageId2 = new SimpleString("SMID2");
- clientSessionXa.start(xid, XAResource.TMNOFLAGS);
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- clientSessionXa.end(xid, XAResource.TMSUCCESS);
- clientSessionXa.prepare(xid);
- restartServer();
- clientSessionXa.commit(xid, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- }
-
- public void testManyMessagesReceivedWithRollback() throws Exception
- {
- Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionXa.createConsumer(qName1);
-
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m1.setDurable(true);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m2.setDurable(true);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m3.setDurable(true);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m4.setDurable(true);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m5.setDurable(true);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m6.setDurable(true);
- clientSessionXa.start(xid, XAResource.TMNOFLAGS);
- clientSessionXa.start();
- producer.send(m1);
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m1");
- producer.send(m2);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m2");
- producer.send(m3);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m3");
- producer.send(m4);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m4");
- producer.send(m5);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m5");
- producer.send(m6);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m6");
- clientSessionXa.end(xid, XAResource.TMSUCCESS);
- clientSessionXa.prepare(xid);
-
- restartServer();
- clientSessionXa.rollback(xid);
- consumer = clientSession.createConsumer(qName1);
- clientSession.start();
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(1000);
- assertNull(m);
- }
- protected void tearDown() throws Exception
- {
- if (clientSession != null)
- {
- try
- {
- clientSession.close();
- }
- catch (MessagingException e1)
- {
- //
- }
- }
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e1)
- {
- //
- }
- }
- server = null;
- clientSession = null;
-
- super.tearDown();
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- clearData();
- configuration = createConfigForJournal();
- configuration.setSecurityEnabled(false);
- TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- server = createServer(true, configuration);
- // start the server
- server.start();
-
- qs = new AddressSettings();
- qs.setSoloQueue(true);
- server.getAddressSettingsRepository().addMatch(address.toString(), qs);
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- sessionFactory.setBlockOnAcknowledge(true);
- sessionFactory.setAckBatchSize(0);
- clientSession = sessionFactory.createSession(false, true, true);
- clientSessionXa = sessionFactory.createSession(true, false, false);
- clientSession.createQueue(address, qName1, null, true);
- }
-
- private void restartServer() throws Exception
- {
- server.stop();
- server = null;
- server = createServer(true, configuration);
- server.getAddressSettingsRepository().addMatch(address.toString(), qs);
- // start the server
- server.start();
-
- AddressSettings qs = new AddressSettings();
- qs.setSoloQueue(true);
- server.getAddressSettingsRepository().addMatch(address.toString(), qs);
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- sessionFactory.setBlockOnAcknowledge(true);
- sessionFactory.setAckBatchSize(0);
- clientSession = sessionFactory.createSession(false, true, true);
- clientSessionXa = sessionFactory.createSession(true, false, false);
- }
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java 2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java 2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,584 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueTest extends UnitTestCase
-{
- private MessagingServer server;
-
- private ClientSession clientSession;
-
- private ClientSession clientSessionTxReceives;
-
- private ClientSession clientSessionTxSends;
-
- private SimpleString address = new SimpleString("SoloQueueTestAddress");
-
- private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
-
- private FakeStorageManager storageManager;
-
-
- public void testSimple() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- ClientMessage m1 = createTextMessage("m1", clientSession);
- SimpleString rh = new SimpleString("SMID1");
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- producer.send(m1);
- producer.send(m2);
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- }
-
- public void testMultipleMessages() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- SimpleString messageId1 = new SimpleString("SMID1");
- SimpleString messageId2 = new SimpleString("SMID2");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- }
-
- public void testFirstMessageReceivedButAckedAfter() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- ClientMessage m1 = createTextMessage("m1", clientSession);
- SimpleString rh = new SimpleString("SMID1");
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- producer.send(m1);
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- producer.send(m2);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m1");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- }
-
- public void testFirstMessageReceivedAndCancelled() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- ClientMessage m1 = createTextMessage("m1", clientSession);
- SimpleString rh = new SimpleString("SMID1");
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- producer.send(m1);
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- producer.send(m2);
- consumer.close();
- consumer = clientSession.createConsumer(qName1);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- m = consumer.receive(1000);
- assertNull(m);
- }
-
- public void testManyMessagesReceivedAndCancelled() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
-
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- clientSession.start();
- producer.send(m1);
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m1");
- producer.send(m2);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m2");
- producer.send(m3);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m3");
- producer.send(m4);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m4");
- producer.send(m5);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m5");
- producer.send(m6);
- m = consumer.receive(1000);
- assertNotNull(m);
- assertEquals(m.getBody().readString(), "m6");
- consumer.close();
- consumer = clientSession.createConsumer(qName1);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(1000);
- assertNull(m);
- }
-
- public void testSimpleInTx() throws Exception
- {
-
- ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
- ClientMessage m1 = createTextMessage("m1", clientSession);
- SimpleString rh = new SimpleString("SMID1");
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- producer.send(m1);
- producer.send(m2);
- clientSessionTxReceives.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- }
-
- public void testMultipleMessagesInTx() throws Exception
- {
- ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
- SimpleString messageId1 = new SimpleString("SMID1");
- SimpleString messageId2 = new SimpleString("SMID2");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- clientSessionTxReceives.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- clientSessionTxReceives.commit();
- m = consumer.receive(1000);
- assertNull(m);
- }
-
- public void testMultipleMessagesInTxRollback() throws Exception
- {
- ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
- SimpleString messageId1 = new SimpleString("SMID1");
- SimpleString messageId2 = new SimpleString("SMID2");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
- producer.send(m1);
- producer.send(m2);
- clientSessionTxReceives.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m1");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- producer.send(m3);
- producer.send(m4);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- clientSessionTxReceives.rollback();
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- }
-
- public void testMultipleMessagesInTxSend() throws Exception
- {
- ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- producer.send(m5);
- producer.send(m6);
- clientSessionTxSends.commit();
- clientSessionTxSends.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- }
-
- public void testMultipleMessagesPersistedCorrectly() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m1.setDurable(true);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m2.setDurable(true);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m3.setDurable(true);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m4.setDurable(true);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m5.setDurable(true);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m6.setDurable(true);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- producer.send(m5);
- producer.send(m6);
- assertEquals(1, storageManager.messageIds.size());
- clientSession.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- assertEquals(0, storageManager.messageIds.size());
- }
-
- public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception
- {
- ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m1.setDurable(true);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m2.setDurable(true);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m3.setDurable(true);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m4.setDurable(true);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m5.setDurable(true);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m6.setDurable(true);
- producer.send(m1);
- producer.send(m2);
- producer.send(m3);
- producer.send(m4);
- producer.send(m5);
- producer.send(m6);
- clientSessionTxSends.commit();
- assertEquals(1, storageManager.messageIds.size());
- clientSessionTxSends.start();
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- assertEquals(0, storageManager.messageIds.size());
- }
-
- public void testMultipleAcksPersistedCorrectly() throws Exception
- {
- ClientProducer producer = clientSession.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSession.createConsumer(qName1);
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m1.setDurable(true);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m2.setDurable(true);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m3.setDurable(true);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m4.setDurable(true);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m5.setDurable(true);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m6.setDurable(true);
- clientSession.start();
- producer.send(m1);
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m1");
- producer.send(m2);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- producer.send(m3);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- producer.send(m4);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- producer.send(m5);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m5");
- producer.send(m6);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- assertEquals(6, storageManager.ackIds.size());
- }
-
- public void testMultipleAcksPersistedCorrectlyInTx() throws Exception
- {
- ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
- ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
- SimpleString rh = new SimpleString("SMID1");
- ClientMessage m1 = createTextMessage("m1", clientSession);
- m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m1.setDurable(true);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m2.setDurable(true);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m3.setDurable(true);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m4.setDurable(true);
- ClientMessage m5 = createTextMessage("m5", clientSession);
- m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m5.setDurable(true);
- ClientMessage m6 = createTextMessage("m6", clientSession);
- m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
- m6.setDurable(true);
- clientSessionTxReceives.start();
- producer.send(m1);
- ClientMessage m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m1");
- producer.send(m2);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
- producer.send(m3);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m3");
- producer.send(m4);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m4");
- producer.send(m5);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m5");
- producer.send(m6);
- m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m6");
- clientSessionTxReceives.commit();
- assertEquals(6, storageManager.ackIds.size());
- }
-
-
-
-
- protected void tearDown() throws Exception
- {
- if (clientSession != null)
- {
- try
- {
- clientSession.close();
- }
- catch (MessagingException e1)
- {
- //
- }
- }
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e1)
- {
- //
- }
- }
- server = null;
- clientSession = null;
-
- super.tearDown();
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- ConfigurationImpl configuration = new ConfigurationImpl();
- configuration.setSecurityEnabled(false);
- TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- storageManager = new FakeStorageManager();
- server = Messaging.newMessagingServer(configuration, storageManager);
- // start the server
- server.start();
-
- AddressSettings qs = new AddressSettings();
- qs.setSoloQueue(true);
- server.getAddressSettingsRepository().addMatch(address.toString(), qs);
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- sessionFactory.setBlockOnAcknowledge(true);
- sessionFactory.setAckBatchSize(0);
- clientSession = sessionFactory.createSession(false, true, true);
- clientSessionTxReceives = sessionFactory.createSession(false, true, false);
- clientSessionTxSends = sessionFactory.createSession(false, false, true);
- clientSession.createQueue(address, qName1, null, true);
- }
-}
More information about the jboss-cvs-commits
mailing list