[jboss-cvs] JBoss Messaging SVN: r6315 - in trunk: src/main/org/jboss/messaging/core/deployers/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 6 08:10:13 EDT 2009


Author: timfox
Date: 2009-04-06 08:10:13 -0400 (Mon, 06 Apr 2009)
New Revision: 6315

Added:
   trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java
Modified:
   trunk/src/config/jbm-jms.xml
   trunk/src/config/jbm-queues.xml
   trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java
Log:
renamed solo queue to LVQ (last value queue)

Modified: trunk/src/config/jbm-jms.xml
===================================================================
--- trunk/src/config/jbm-jms.xml	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/config/jbm-jms.xml	2009-04-06 12:10:13 UTC (rev 6315)
@@ -122,9 +122,6 @@
    <topic name="testDurableTopic">
       <entry name="/topic/testDurableTopic"/>
    </topic>
-   <topic name="testSoloTopic">
-      <entry name="/topic/testSoloTopic"/>
-   </topic>
    <queue name="testQueue">
       <entry name="/queue/testQueue"/>
    </queue>

Modified: trunk/src/config/jbm-queues.xml
===================================================================
--- trunk/src/config/jbm-queues.xml	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/config/jbm-queues.xml	2009-04-06 12:10:13 UTC (rev 6315)
@@ -123,10 +123,6 @@
       <distribution-policy-class>org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor</distribution-policy-class>
    </address-settings>
 
-   <address-settings match="topicjms.testSoloTopic">
-      <solo-queue>true</solo-queue>
-   </address-settings>
-
    <!--default for catch all-->
    <address-settings match="#">
       <clustered>false</clustered>

Modified: trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/deployers/impl/AddressSettingsDeployer.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -53,7 +53,7 @@
 
    private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
 
-   private static final String SOLO_MESSAGE_NODE_NAME = "solo-queue";
+   private static final String LVQ_NODE_NAME = "last-value-queue";
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
@@ -137,9 +137,9 @@
          {
             addressSettings.setDropMessagesWhenFull(Boolean.valueOf(child.getTextContent().trim()));
          }
-         else if (SOLO_MESSAGE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+         else if (LVQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
          {
-            addressSettings.setSoloQueue(Boolean.valueOf(child.getTextContent().trim()));
+            addressSettings.setLastValueQueue(Boolean.valueOf(child.getTextContent().trim()));
          }
          else if (MAX_DELIVERY_ATTEMPTS.equalsIgnoreCase(child.getNodeName()))
          {

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -72,7 +72,7 @@
    
    public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_JBM_FROM_CLUSTER");
 
-   public static final SimpleString HDR_SOLE_MESSAGE = new SimpleString("_JBM_SOLO_MESSAGE");
+   public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_JBM_LVQ_NAME");
 
    public static final byte OBJECT_TYPE = 2;
       

Copied: trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java (from rev 6312, trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/LastValueQueue.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,293 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
+ * In other words it only retains the last value
+ * This is useful for example, for stock prices, where you're only interested in the latest value
+ * for a particular stock
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LastValueQueue extends QueueImpl
+{
+   private static final Logger log = Logger.getLogger(LastValueQueue.class);
+
+   private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+
+   private final PagingManager pagingManager;
+
+   private final StorageManager storageManager;
+
+   public LastValueQueue(final long persistenceID,
+                        final SimpleString address,
+                        final SimpleString name,
+                        final Filter filter,
+                        final boolean durable,
+                        final boolean temporary,
+                        final ScheduledExecutorService scheduledExecutor,
+                        final PostOffice postOffice,
+                        final StorageManager storageManager,
+                        final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+   {
+      super(persistenceID,
+            address,
+            name,
+            filter,
+            durable,
+            temporary,
+            scheduledExecutor,
+            postOffice,
+            storageManager,
+            addressSettingsRepository);
+      this.pagingManager = postOffice.getPagingManager();
+      this.storageManager = storageManager;
+   }
+
+   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   {
+      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+      if (prop != null)
+      {
+         synchronized (map)
+         {
+            ServerMessage msg = map.put(prop, message);
+            // if an older message existed then we discard it
+            if (msg != null)
+            {
+               MessageReference ref;
+               if (tx != null)
+               {
+                  discardMessage(msg.getMessageID(), tx);
+               }
+               else
+               {
+                  ref = removeReferenceWithID(msg.getMessageID());
+                  if (ref != null)
+                  {
+                     discardMessage(ref, tx);
+                  }
+               }
+
+            }
+         }
+      }
+      super.route(message, tx);
+   }
+
+   public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
+   {
+      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+      if (prop != null)
+      {
+         synchronized (map)
+         {
+            ServerMessage msg = map.put(prop, message);
+            if (msg != null)
+            {
+               if (tx != null)
+               {
+                  rediscardMessage(msg.getMessageID(), tx);
+               }
+               else
+               {
+                  MessageReference ref = removeReferenceWithID(msg.getMessageID());
+                  rediscardMessage(ref);
+               }
+            }
+         }
+      }
+      return super.reroute(message, tx);
+   }
+
+   public void acknowledge(final MessageReference ref) throws Exception
+   {
+      super.acknowledge(ref);
+      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+      if (prop != null)
+      {
+         synchronized (map)
+         {
+            ServerMessage serverMessage = map.get(prop);
+            if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
+            {
+               map.remove(prop);
+            }
+         }
+      }
+   }
+
+   public void cancel(final Transaction tx, final MessageReference ref) throws Exception
+   {
+      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+      if (prop != null)
+      {
+         synchronized (map)
+         {
+            ServerMessage msg = map.get(prop);
+            if (msg.getMessageID() == ref.getMessage().getMessageID())
+            {
+               super.cancel(tx, ref);
+            }
+            else
+            {
+               discardMessage(ref, tx);
+            }
+         }
+      }
+      else
+      {
+         super.cancel(tx, ref);
+      }
+   }
+
+   void postRollback(final LinkedList<MessageReference> refs) throws Exception
+   {
+      List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
+      List<SimpleString> refsToClear = new ArrayList<SimpleString>();
+      synchronized (map)
+      {
+         for (MessageReference ref : refs)
+         {
+            SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+            if (prop != null)
+            {
+               ServerMessage msg = map.get(prop);
+               if (msg != null)
+               {
+                  if (msg.getMessageID() != ref.getMessage().getMessageID())
+                  {
+                     refsToDiscard.add(ref);
+                  }
+                  else
+                  {
+                     refsToClear.add(prop);
+                  }
+               }
+            }
+         }
+         for (SimpleString simpleString : refsToClear)
+         {
+            map.remove(simpleString);
+         }
+      }
+      for (MessageReference ref : refsToDiscard)
+      {
+         refs.remove(ref);
+         discardMessage(ref, null);
+      }
+      super.postRollback(refs);
+   }
+
+   final void discardMessage(MessageReference ref, Transaction tx) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+      store.addSize(-ref.getMemoryEstimate());
+      QueueImpl queue = (QueueImpl)ref.getQueue();
+      ServerMessage msg = ref.getMessage();
+      boolean durableRef = msg.isDurable() && queue.isDurable();
+
+      if (durableRef)
+      {
+         int count = msg.decrementDurableRefCount();
+
+         if (count == 0)
+         {
+            if (tx == null)
+            {
+               storageManager.deleteMessage(msg.getMessageID());
+            }
+            else
+            {
+               storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+            }
+         }
+      }
+   }
+
+   final void discardMessage(Long id, Transaction tx) throws Exception
+   {
+      RefsOperation oper = getRefsOperation(tx);
+      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+            discardMessage(ref, tx);
+            break;
+         }
+      }
+
+   }
+
+   final void rediscardMessage(long id, Transaction tx) throws Exception
+   {
+      RefsOperation oper = getRefsOperation(tx);
+      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+
+      while (iterator.hasNext())
+      {
+         MessageReference ref = iterator.next();
+
+         if (ref.getMessage().getMessageID() == id)
+         {
+            iterator.remove();
+            rediscardMessage(ref);
+            break;
+         }
+      }
+   }
+
+   final void rediscardMessage(MessageReference ref) throws Exception
+   {
+      deliveringCount.decrementAndGet();
+      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
+      store.addSize(-ref.getMemoryEstimate());
+   }
+}

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -78,9 +78,9 @@
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
 
       Queue queue;
-      if (addressSettings.isSoloQueue())
+      if (addressSettings.isLastValueQueue())
       {
-         queue = new SoloQueueImpl(persistenceID,
+         queue = new LastValueQueue(persistenceID,
                                    address,
                                    name,
                                    filter,

Deleted: trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SoloQueueImpl.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,290 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A queue that will discard messages if a newer message with the same MessageImpl.HDR_SOLE_MESSAGE property value.
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueImpl extends QueueImpl
-{
-   private static final Logger log = Logger.getLogger(SoloQueueImpl.class);
-
-   private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
-
-   private final PagingManager pagingManager;
-
-   private final StorageManager storageManager;
-
-   public SoloQueueImpl(final long persistenceID,
-                        final SimpleString address,
-                        final SimpleString name,
-                        final Filter filter,
-                        final boolean durable,
-                        final boolean temporary,
-                        final ScheduledExecutorService scheduledExecutor,
-                        final PostOffice postOffice,
-                        final StorageManager storageManager,
-                        final HierarchicalRepository<AddressSettings> addressSettingsRepository)
-   {
-      super(persistenceID,
-            address,
-            name,
-            filter,
-            durable,
-            temporary,
-            scheduledExecutor,
-            postOffice,
-            storageManager,
-            addressSettingsRepository);
-      this.pagingManager = postOffice.getPagingManager();
-      this.storageManager = storageManager;
-   }
-
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_SOLE_MESSAGE);
-      if (prop != null)
-      {
-         synchronized (map)
-         {
-            ServerMessage msg = map.put(prop, message);
-            // if an older message existed then we discard it
-            if (msg != null)
-            {
-               MessageReference ref;
-               if (tx != null)
-               {
-                  discardMessage(msg.getMessageID(), tx);
-               }
-               else
-               {
-                  ref = removeReferenceWithID(msg.getMessageID());
-                  if (ref != null)
-                  {
-                     discardMessage(ref, tx);
-                  }
-               }
-
-            }
-         }
-      }
-      super.route(message, tx);
-   }
-
-   public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
-   {
-      SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_SOLE_MESSAGE);
-      if (prop != null)
-      {
-         synchronized (map)
-         {
-            ServerMessage msg = map.put(prop, message);
-            if (msg != null)
-            {
-               if (tx != null)
-               {
-                  rediscardMessage(msg.getMessageID(), tx);
-               }
-               else
-               {
-                  MessageReference ref = removeReferenceWithID(msg.getMessageID());
-                  rediscardMessage(ref);
-               }
-            }
-         }
-      }
-      return super.reroute(message, tx);
-   }
-
-   public void acknowledge(final MessageReference ref) throws Exception
-   {
-      super.acknowledge(ref);
-      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
-      if (prop != null)
-      {
-         synchronized (map)
-         {
-            ServerMessage serverMessage = map.get(prop);
-            if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
-            {
-               map.remove(prop);
-            }
-         }
-      }
-   }
-
-   public void cancel(final Transaction tx, final MessageReference ref) throws Exception
-   {
-      SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
-      if (prop != null)
-      {
-         synchronized (map)
-         {
-            ServerMessage msg = map.get(prop);
-            if (msg.getMessageID() == ref.getMessage().getMessageID())
-            {
-               super.cancel(tx, ref);
-            }
-            else
-            {
-               discardMessage(ref, tx);
-            }
-         }
-      }
-      else
-      {
-         super.cancel(tx, ref);
-      }
-   }
-
-   void postRollback(final LinkedList<MessageReference> refs) throws Exception
-   {
-      List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
-      List<SimpleString> refsToClear = new ArrayList<SimpleString>();
-      synchronized (map)
-      {
-         for (MessageReference ref : refs)
-         {
-            SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_SOLE_MESSAGE);
-            if (prop != null)
-            {
-               ServerMessage msg = map.get(prop);
-               if (msg != null)
-               {
-                  if (msg.getMessageID() != ref.getMessage().getMessageID())
-                  {
-                     refsToDiscard.add(ref);
-                  }
-                  else
-                  {
-                     refsToClear.add(prop);
-                  }
-               }
-            }
-         }
-         for (SimpleString simpleString : refsToClear)
-         {
-            map.remove(simpleString);
-         }
-      }
-      for (MessageReference ref : refsToDiscard)
-      {
-         refs.remove(ref);
-         discardMessage(ref, null);
-      }
-      super.postRollback(refs);
-   }
-
-   final void discardMessage(MessageReference ref, Transaction tx) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-      QueueImpl queue = (QueueImpl)ref.getQueue();
-      ServerMessage msg = ref.getMessage();
-      boolean durableRef = msg.isDurable() && queue.isDurable();
-
-      if (durableRef)
-      {
-         int count = msg.decrementDurableRefCount();
-
-         if (count == 0)
-         {
-            if (tx == null)
-            {
-               storageManager.deleteMessage(msg.getMessageID());
-            }
-            else
-            {
-               storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
-            }
-         }
-      }
-   }
-
-   final void discardMessage(Long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            discardMessage(ref, tx);
-            break;
-         }
-      }
-
-   }
-
-   final void rediscardMessage(long id, Transaction tx) throws Exception
-   {
-      RefsOperation oper = getRefsOperation(tx);
-      Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
-      while (iterator.hasNext())
-      {
-         MessageReference ref = iterator.next();
-
-         if (ref.getMessage().getMessageID() == id)
-         {
-            iterator.remove();
-            rediscardMessage(ref);
-            break;
-         }
-      }
-   }
-
-   final void rediscardMessage(MessageReference ref) throws Exception
-   {
-      deliveringCount.decrementAndGet();
-      PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
-      store.addSize(-ref.getMemoryEstimate());
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/src/main/org/jboss/messaging/core/settings/impl/AddressSettings.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -55,7 +55,7 @@
 
    public static final long DEFAULT_REDELIVER_DELAY = 0L;
 
-   public static final boolean DEFAULT_SOLO_QUEUE = false;
+   public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
 
    public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
 
@@ -77,18 +77,18 @@
 
    private SimpleString expiryAddress = null;
 
-   private Boolean soloQueue = null;
+   private Boolean lastValueQueue = null;
 
    private Long redistributionDelay = null;
 
-   public boolean isSoloQueue()
+   public boolean isLastValueQueue()
    {
-      return soloQueue != null ? soloQueue : DEFAULT_SOLO_QUEUE;
+      return lastValueQueue != null ? lastValueQueue : DEFAULT_LAST_VALUE_QUEUE;
    }
 
-   public void setSoloQueue(final boolean soloQueue)
+   public void setLastValueQueue(final boolean lastValueQueue)
    {
-      this.soloQueue = soloQueue;
+      this.lastValueQueue = lastValueQueue;
    }
 
    public int getPageSizeBytes()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSessionCreateAndDeleteQueueTest.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -26,7 +26,7 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.impl.SoloQueueImpl;
+import org.jboss.messaging.core.server.impl.LastValueQueue;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
@@ -102,13 +102,13 @@
     public void testAddressSettingUSed() throws Exception
    {
       AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setSoloQueue(true);
+      addressSettings.setLastValueQueue(true);
       server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
       ClientSession session = createInVMFactory().createSession(false, true, true);
       SimpleString filterString = new SimpleString("x=y");
       session.createQueue(address, queueName, filterString, false);
       Binding binding = server.getPostOffice().getBinding(queueName);
-      assertTrue(binding.getBindable() instanceof SoloQueueImpl);
+      assertTrue(binding.getBindable() instanceof LastValueQueue);
 
       session.close();
    }

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java (from rev 6312, trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQRecoveryTest.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LVQRecoveryTest extends ServiceTestBase
+{
+   private MessagingServer server;
+
+   private ClientSession clientSession;
+
+   private SimpleString address = new SimpleString("LVQTestAddress");
+
+   private SimpleString qName1 = new SimpleString("LVQTestQ1");
+
+   private ClientSession clientSessionXa;
+
+   private Configuration configuration;
+
+   private AddressSettings qs;
+
+   public void testMultipleMessagesAfterRecovery() throws Exception
+   {
+      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+      ClientProducer producer = clientSessionXa.createProducer(address, -1, true, true);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSessionXa.end(xid, XAResource.TMSUCCESS);
+      clientSessionXa.prepare(xid);
+      restartServer();
+      clientSessionXa.commit(xid, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testManyMessagesReceivedWithRollback() throws Exception
+   {
+      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionXa.createConsumer(qName1);
+
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m6.setDurable(true);
+      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+      clientSessionXa.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m6");
+      clientSessionXa.end(xid, XAResource.TMSUCCESS);
+      clientSessionXa.prepare(xid);
+
+      restartServer();
+      clientSessionXa.rollback(xid);
+      consumer = clientSession.createConsumer(qName1);
+      clientSession.start();
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (server != null && server.isStarted())
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      server = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      clearData();
+      configuration = createConfigForJournal();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      server = createServer(true, configuration);
+      // start the server
+      server.start();
+
+      qs = new AddressSettings();
+      qs.setLastValueQueue(true);
+      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionXa = sessionFactory.createSession(true, false, false);
+      clientSession.createQueue(address, qName1, null, true);
+   }
+
+   private void restartServer() throws Exception
+   {
+      server.stop();
+      server = null;
+      server = createServer(true, configuration);
+      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // start the server
+      server.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setLastValueQueue(true);
+      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionXa = sessionFactory.createSession(true, false, false);
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java (from rev 6312, trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/LVQTest.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -0,0 +1,584 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class LVQTest extends UnitTestCase
+{
+   private MessagingServer server;
+
+   private ClientSession clientSession;
+
+   private ClientSession clientSessionTxReceives;
+
+   private ClientSession clientSessionTxSends;
+
+   private SimpleString address = new SimpleString("LVQTestAddress");
+
+   private SimpleString qName1 = new SimpleString("LVQTestQ1");
+
+   private FakeStorageManager storageManager;
+
+
+   public void testSimple() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      producer.send(m2);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testMultipleMessages() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testFirstMessageReceivedButAckedAfter() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      producer.send(m2);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testFirstMessageReceivedAndCancelled() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      producer.send(m2);
+      consumer.close();
+      consumer = clientSession.createConsumer(qName1);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testManyMessagesReceivedAndCancelled() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      clientSession.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m6");
+      consumer.close();
+      consumer = clientSession.createConsumer(qName1);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testSimpleInTx() throws Exception
+   {
+
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      producer.send(m2);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testMultipleMessagesInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      clientSessionTxReceives.commit();
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testMultipleMessagesInTxRollback() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      clientSessionTxReceives.rollback();
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testMultipleMessagesInTxSend() throws Exception
+   {
+      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      clientSessionTxSends.commit();
+      clientSessionTxSends.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+   }
+
+   public void testMultipleMessagesPersistedCorrectly() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m6.setDurable(true);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      assertEquals(1, storageManager.messageIds.size());
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(0, storageManager.messageIds.size());
+   }
+
+   public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m6.setDurable(true);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      clientSessionTxSends.commit();
+      assertEquals(1, storageManager.messageIds.size());
+      clientSessionTxSends.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(0, storageManager.messageIds.size());
+   }
+
+   public void testMultipleAcksPersistedCorrectly() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m6.setDurable(true);
+      clientSession.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(6, storageManager.ackIds.size());
+   }
+
+   public void testMultipleAcksPersistedCorrectlyInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_LAST_VALUE_NAME, rh);
+      m6.setDurable(true);
+      clientSessionTxReceives.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      clientSessionTxReceives.commit();
+      assertEquals(6, storageManager.ackIds.size());
+   }
+
+   
+
+
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (server != null && server.isStarted())
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      server = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      storageManager = new FakeStorageManager();
+      server = Messaging.newMessagingServer(configuration, storageManager);
+      // start the server
+      server.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setLastValueQueue(true);
+      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionTxReceives = sessionFactory.createSession(false, true, false);
+      clientSessionTxSends = sessionFactory.createSession(false, false, true);
+      clientSession.createQueue(address, qName1, null, true);
+   }
+}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,236 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueRecoveryTest extends ServiceTestBase
-{
-   private MessagingServer server;
-
-   private ClientSession clientSession;
-
-   private SimpleString address = new SimpleString("SoloQueueTestAddress");
-
-   private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
-
-   private ClientSession clientSessionXa;
-
-   private Configuration configuration;
-
-   private AddressSettings qs;
-
-   public void testMultipleMessagesAfterRecovery() throws Exception
-   {
-      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
-      ClientProducer producer = clientSessionXa.createProducer(address, -1, true, true);
-      SimpleString messageId1 = new SimpleString("SMID1");
-      SimpleString messageId2 = new SimpleString("SMID2");
-      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      clientSessionXa.end(xid, XAResource.TMSUCCESS);
-      clientSessionXa.prepare(xid);
-      restartServer();
-      clientSessionXa.commit(xid, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-   }
-
-   public void testManyMessagesReceivedWithRollback() throws Exception
-   {
-      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionXa.createConsumer(qName1);
-
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m1.setDurable(true);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m2.setDurable(true);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m3.setDurable(true);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m4.setDurable(true);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m5.setDurable(true);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m6.setDurable(true);
-      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
-      clientSessionXa.start();
-      producer.send(m1);
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m1");
-      producer.send(m2);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m2");
-      producer.send(m3);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m3");
-      producer.send(m4);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m4");
-      producer.send(m5);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m5");
-      producer.send(m6);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m6");
-      clientSessionXa.end(xid, XAResource.TMSUCCESS);
-      clientSessionXa.prepare(xid);
-
-      restartServer();
-      clientSessionXa.rollback(xid);
-      consumer = clientSession.createConsumer(qName1);
-      clientSession.start();
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      m = consumer.receive(1000);
-      assertNull(m);
-   }
-   protected void tearDown() throws Exception
-   {
-      if (clientSession != null)
-      {
-         try
-         {
-            clientSession.close();
-         }
-         catch (MessagingException e1)
-         {
-            //
-         }
-      }
-      if (server != null && server.isStarted())
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Exception e1)
-         {
-            //
-         }
-      }
-      server = null;
-      clientSession = null;
-      
-      super.tearDown();
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      clearData();
-      configuration = createConfigForJournal();
-      configuration.setSecurityEnabled(false);
-      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      server = createServer(true, configuration);
-      // start the server
-      server.start();
-
-      qs = new AddressSettings();
-      qs.setSoloQueue(true);
-      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      sessionFactory.setBlockOnAcknowledge(true);
-      sessionFactory.setAckBatchSize(0);
-      clientSession = sessionFactory.createSession(false, true, true);
-      clientSessionXa = sessionFactory.createSession(true, false, false);
-      clientSession.createQueue(address, qName1, null, true);
-   }
-
-   private void restartServer() throws Exception
-   {
-      server.stop();
-      server = null;
-      server = createServer(true, configuration);
-      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
-      // start the server
-      server.start();
-
-      AddressSettings qs = new AddressSettings();
-      qs.setSoloQueue(true);
-      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      sessionFactory.setBlockOnAcknowledge(true);
-      sessionFactory.setAckBatchSize(0);
-      clientSession = sessionFactory.createSession(false, true, true);
-      clientSessionXa = sessionFactory.createSession(true, false, false);
-   }
-}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java	2009-04-06 12:09:40 UTC (rev 6314)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java	2009-04-06 12:10:13 UTC (rev 6315)
@@ -1,584 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class SoloQueueTest extends UnitTestCase
-{
-   private MessagingServer server;
-
-   private ClientSession clientSession;
-
-   private ClientSession clientSessionTxReceives;
-
-   private ClientSession clientSessionTxSends;
-
-   private SimpleString address = new SimpleString("SoloQueueTestAddress");
-
-   private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
-
-   private FakeStorageManager storageManager;
-
-
-   public void testSimple() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      SimpleString rh = new SimpleString("SMID1");
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      producer.send(m1);
-      producer.send(m2);
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-   }
-
-   public void testMultipleMessages() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      SimpleString messageId1 = new SimpleString("SMID1");
-      SimpleString messageId2 = new SimpleString("SMID2");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-   }
-
-   public void testFirstMessageReceivedButAckedAfter() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      SimpleString rh = new SimpleString("SMID1");
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      producer.send(m1);
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      producer.send(m2);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m1");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-   }
-
-   public void testFirstMessageReceivedAndCancelled() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      SimpleString rh = new SimpleString("SMID1");
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      producer.send(m1);
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      producer.send(m2);
-      consumer.close();
-      consumer = clientSession.createConsumer(qName1);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-      m = consumer.receive(1000);
-      assertNull(m);
-   }
-
-   public void testManyMessagesReceivedAndCancelled() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      clientSession.start();
-      producer.send(m1);
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m1");
-      producer.send(m2);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m2");
-      producer.send(m3);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m3");
-      producer.send(m4);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m4");
-      producer.send(m5);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m5");
-      producer.send(m6);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "m6");
-      consumer.close();
-      consumer = clientSession.createConsumer(qName1);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      m = consumer.receive(1000);
-      assertNull(m);
-   }
-
-   public void testSimpleInTx() throws Exception
-   {
-
-      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      SimpleString rh = new SimpleString("SMID1");
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      producer.send(m1);
-      producer.send(m2);
-      clientSessionTxReceives.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-   }
-
-   public void testMultipleMessagesInTx() throws Exception
-   {
-      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
-      SimpleString messageId1 = new SimpleString("SMID1");
-      SimpleString messageId2 = new SimpleString("SMID2");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      clientSessionTxReceives.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-      clientSessionTxReceives.commit();
-      m = consumer.receive(1000);
-      assertNull(m);
-   }
-
-   public void testMultipleMessagesInTxRollback() throws Exception
-   {
-      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
-      SimpleString messageId1 = new SimpleString("SMID1");
-      SimpleString messageId2 = new SimpleString("SMID2");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
-      producer.send(m1);
-      producer.send(m2);
-      clientSessionTxReceives.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m1");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-      producer.send(m3);
-      producer.send(m4);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-      clientSessionTxReceives.rollback();
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-   }
-
-   public void testMultipleMessagesInTxSend() throws Exception
-   {
-      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      producer.send(m5);
-      producer.send(m6);
-      clientSessionTxSends.commit();
-      clientSessionTxSends.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-   }
-
-   public void testMultipleMessagesPersistedCorrectly() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m1.setDurable(true);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m2.setDurable(true);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m3.setDurable(true);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m4.setDurable(true);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m5.setDurable(true);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m6.setDurable(true);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      producer.send(m5);
-      producer.send(m6);
-      assertEquals(1, storageManager.messageIds.size());
-      clientSession.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      assertEquals(0, storageManager.messageIds.size());
-   }
-
-   public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception
-   {
-      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m1.setDurable(true);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m2.setDurable(true);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m3.setDurable(true);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m4.setDurable(true);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m5.setDurable(true);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m6.setDurable(true);
-      producer.send(m1);
-      producer.send(m2);
-      producer.send(m3);
-      producer.send(m4);
-      producer.send(m5);
-      producer.send(m6);
-      clientSessionTxSends.commit();
-      assertEquals(1, storageManager.messageIds.size());
-      clientSessionTxSends.start();
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      assertEquals(0, storageManager.messageIds.size());
-   }
-
-   public void testMultipleAcksPersistedCorrectly() throws Exception
-   {
-      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSession.createConsumer(qName1);
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m1.setDurable(true);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m2.setDurable(true);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m3.setDurable(true);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m4.setDurable(true);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m5.setDurable(true);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m6.setDurable(true);
-      clientSession.start();
-      producer.send(m1);
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m1");
-      producer.send(m2);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-      producer.send(m3);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      producer.send(m4);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-      producer.send(m5);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m5");
-      producer.send(m6);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      assertEquals(6, storageManager.ackIds.size());
-   }
-
-   public void testMultipleAcksPersistedCorrectlyInTx() throws Exception
-   {
-      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
-      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
-      SimpleString rh = new SimpleString("SMID1");
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m1.setDurable(true);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m2.setDurable(true);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m3.setDurable(true);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m4.setDurable(true);
-      ClientMessage m5 = createTextMessage("m5", clientSession);
-      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m5.setDurable(true);
-      ClientMessage m6 = createTextMessage("m6", clientSession);
-      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
-      m6.setDurable(true);
-      clientSessionTxReceives.start();
-      producer.send(m1);
-      ClientMessage m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m1");
-      producer.send(m2);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m2");
-      producer.send(m3);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m3");
-      producer.send(m4);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m4");
-      producer.send(m5);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m5");
-      producer.send(m6);
-      m = consumer.receive(1000);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "m6");
-      clientSessionTxReceives.commit();
-      assertEquals(6, storageManager.ackIds.size());
-   }
-
-   
-
-
-   protected void tearDown() throws Exception
-   {
-      if (clientSession != null)
-      {
-         try
-         {
-            clientSession.close();
-         }
-         catch (MessagingException e1)
-         {
-            //
-         }
-      }
-      if (server != null && server.isStarted())
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Exception e1)
-         {
-            //
-         }
-      }
-      server = null;
-      clientSession = null;
-      
-      super.tearDown();
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      ConfigurationImpl configuration = new ConfigurationImpl();
-      configuration.setSecurityEnabled(false);
-      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      storageManager = new FakeStorageManager();
-      server = Messaging.newMessagingServer(configuration, storageManager);
-      // start the server
-      server.start();
-
-      AddressSettings qs = new AddressSettings();
-      qs.setSoloQueue(true);
-      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      sessionFactory.setBlockOnAcknowledge(true);
-      sessionFactory.setAckBatchSize(0);
-      clientSession = sessionFactory.createSession(false, true, true);
-      clientSessionTxReceives = sessionFactory.createSession(false, true, false);
-      clientSessionTxSends = sessionFactory.createSession(false, false, true);
-      clientSession.createQueue(address, qName1, null, true);
-   }
-}




More information about the jboss-cvs-commits mailing list