[rhmessaging-commits] rhmessaging commits: r3125 - in store/branches/java/broker-queue-refactor/java/bdbstore: lib and 4 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Feb 20 08:34:27 EST 2009


Author: ritchiem
Date: 2009-02-20 08:34:27 -0500 (Fri, 20 Feb 2009)
New Revision: 3125

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
   store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
   store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
QPID-1632 : Update to perform internal reference counting. Additional testing via MessagePersistent*Tests

Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ - 
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ - 
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+
+ This is an example config using the BDBMessageStore available from
+ the Red Hat Messaging project at etp.108.redhat.com and distributed under GPL.
+ -->
+        
+<broker>
+    <prefix>${QPID_HOME}</prefix>
+    <work>${QPID_WORK}</work>
+    <conf>${prefix}/etc</conf>
+    <connector>
+        <transport>nio</transport>
+        <port>5672</port>
+        <sslport>8672</sslport>
+        <socketReceiveBuffer>32768</socketReceiveBuffer>
+        <socketSendBuffer>32768</socketSendBuffer>
+    </connector>
+    <management>
+        <enabled>true</enabled>
+        <jmxport>8999</jmxport>
+        <security-enabled>false</security-enabled>
+        <ssl>
+            <enabled>true</enabled>
+            <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore  -->
+            <keyStorePath>${prefix}/../test_resources/ssl/keystore.jks</keyStorePath>
+            <keyStorePassword>password</keyStorePassword>
+        </ssl>
+    </management>
+    <advanced>
+        <filterchain enableExecutorPool="true"/>
+        <enablePooledAllocator>false</enablePooledAllocator>
+        <enableDirectBuffers>false</enableDirectBuffers>
+        <framesize>65535</framesize>
+        <compressBufferOnQueue>false</compressBufferOnQueue>
+    </advanced>
+
+    <security>
+        <principal-databases>
+            <principal-database>
+                <name>passwordfile</name>
+                <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+                <attributes>
+                    <attribute>
+                        <name>passwordFile</name>
+                        <value>${conf}/passwd</value>
+                    </attribute>
+                </attributes>
+            </principal-database>
+        </principal-databases>
+
+        <access>
+            <class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
+        </access>
+        <jmx>
+            <access>${conf}/jmxremote.access</access>
+            <principal-database>passwordfile</principal-database>
+        </jmx>
+    </security>
+
+    <virtualhosts>
+        <virtualhost>
+            <name>localhost</name>
+            <localhost>
+                <store>
+                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+                    <environment-path>${work}/bdbstore/localhost-store</environment-path>
+                </store>
+            </localhost>
+        </virtualhost>
+
+        <virtualhost>
+            <name>development</name>
+            <development>
+                <store>
+                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+                    <environment-path>${work}/bdbstore/development-store</environment-path>
+                </store>
+            </development>
+        </virtualhost>
+
+        <virtualhost>
+            <name>test</name>
+            <test>
+                <store>
+                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+                    <environment-path>${work}/bdbstore/test-store</environment-path>
+                </store>
+            </test>
+        </virtualhost>
+
+    </virtualhosts>
+    <heartbeat>
+        <delay>0</delay>
+        <timeoutFactor>2.0</timeoutFactor>
+    </heartbeat>
+    <queue>
+        <auto_register>true</auto_register>
+    </queue>
+
+    <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+

Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ - 
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ - 
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<virtualhosts>
+    <default>test</default>
+    <virtualhost>
+        <name>localhost</name>
+        <localhost>            
+            <exchanges>
+                <exchange>
+                    <type>direct</type>
+                    <name>test.direct</name>
+                    <durable>true</durable>
+                </exchange>
+                <exchange>
+                    <type>topic</type>
+                    <name>test.topic</name>
+                </exchange>
+            </exchanges>
+            <queues>
+                <exchange>amq.direct</exchange>
+                <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+
+                <queue>
+                    <name>queue</name>
+                </queue>
+                <queue>
+                    <name>ping</name>
+                </queue>
+                <queue>
+                    <name>test-queue</name>
+                    <test-queue>
+                        <exchange>test.direct</exchange>
+                        <durable>true</durable>
+                    </test-queue>
+                </queue>
+                <queue>
+                    <name>test-ping</name>
+                    <test-ping>
+                        <exchange>test.direct</exchange>
+                    </test-ping>
+                </queue>
+
+            </queues>
+        </localhost>
+    </virtualhost>
+
+
+    <virtualhost>
+        <name>development</name>
+        <development>
+            <queues>
+                <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+                <maximumMessageCount>5000</maximumMessageCount>
+                <queue>
+                    <name>queue</name>
+                    <queue>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                    </queue>
+                </queue>
+                <queue>
+                    <name>ping</name>
+                    <ping>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                    </ping>
+                </queue>
+            </queues>
+        </development>
+    </virtualhost>
+        <virtualhost>
+            <name>test</name>
+            <test>                
+                <queues>
+                    <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+                    <maximumMessageCount>5000</maximumMessageCount>
+                    <queue>
+                        <name>queue</name>
+                        <queue>
+                            <exchange>amq.direct</exchange>
+                            <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                            <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                            <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                        </queue>
+                    </queue>
+                    <queue>
+                        <name>ping</name>
+                        <ping>
+                            <exchange>amq.direct</exchange>
+                            <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                            <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                            <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
+                        </ping>
+                    </queue>
+                </queues>
+            </test>
+    </virtualhost>
+</virtualhosts>

Added: store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
===================================================================
(Binary files differ)


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -49,15 +49,16 @@
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.PersistentAMQMessage;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.TransactionLog;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -130,8 +131,11 @@
 
     private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
 
+    protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new ConcurrentHashMap<Long, List<AMQQueue>>();
+
+    private final Map<Transaction, Map<Long, List<AMQQueue>>> _dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
+
     // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
-
     private QueueTupleBindingFactory _queueTupleBindingFactory;
     private BindingTupleBindingFactory _bindingTupleBindingFactory;
 
@@ -188,16 +192,15 @@
      * whatever parameters it wants.
      *
      * @param virtualHost The virtual host using by this store
-     * @param base        The base element identifier from which all configuration items are relative. For example, if
-     *                    the base element is "store", the all elements used by concrete classes will be "store.foo"
-     *                    etc.
-     * @param config      The apache commons configuration object.
+     * @param base        Not used
+     * @param vHostConfig The configuration for this virtualhost
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
     {
-        File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
+        Configuration config = vHostConfig.getStoreConfiguration();
+        File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
         if (!environmentPath.exists())
         {
             if (!environmentPath.mkdirs())
@@ -207,7 +210,7 @@
             }
         }
 
-        _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+        _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
 
         configure(virtualHost, environmentPath, false);
     }
@@ -477,13 +480,14 @@
 
     /**
      * Removes the specified message from the store in the given transactional store context.
-     *
+     * Internal method that is package scoped to allow testing.
+     *  
      * @param context   The transactional context to remove the message in.
      * @param messageId Identifies the message to remove.
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    void removeMessage(StoreContext context, Long messageId) throws AMQException
     {
         // _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
         // + "): called");
@@ -559,7 +563,7 @@
 
             if (localTx)
             {
-                commit(tx);
+                tx.commit();
                 context.setPayload(null);
             }
         }
@@ -943,6 +947,8 @@
         DatabaseEntry value = new DatabaseEntry();
         ByteBinding.byteToEntry((byte) 0, value);
 
+        recordEnqueue(messageId, queue);
+
         try
         {
             _deliveryDb.put(tx, key, value);
@@ -968,6 +974,7 @@
             try
             {
                 tx = _environment.beginTransaction(null, null);
+                _log.info("Creating local transaction:" + tx);
                 context.setPayload(tx);
 
                 return true;
@@ -1026,11 +1033,32 @@
 
             }
 
+            //Record the delete for processing AFTER the commit has taken place.
+            synchronized (_dequeueTxMap)
+            {
+                Map<Long, List<AMQQueue>> transactionMap = _dequeueTxMap.get(tx);
+                if (transactionMap == null)
+                {
+                    transactionMap = new HashMap<Long, List<AMQQueue>>();
+                    _dequeueTxMap.put(tx, transactionMap);
+                }
+
+                List<AMQQueue> queueList = transactionMap.get(messageId);
+
+                if (queueList == null)
+                {
+                    queueList = new LinkedList<AMQQueue>();
+                    transactionMap.put(messageId, queueList);
+                }
+
+                queueList.add(queue);
+            }
+
             if (isLocal)
             {
                 commit(tx);
                 context.setPayload(null);
-            }
+            }                                            
         }
         catch (DatabaseException e)
         {
@@ -1048,17 +1076,19 @@
                 {
                     throw new AMQException("Error rolling back transaction: " + e1, e1);
                 }
+                finally
+                {
+                    synchronized (_dequeueTxMap)
+                    {
+                        _dequeueTxMap.remove(tx);
+                    }
+                }
             }
 
             throw new AMQException("Error accessing database while dequeuing message: " + e, e);
         }
     }
 
-    private boolean isLocalTransaction(StoreContext context)
-    {
-        return context.getPayload() == null;
-    }
-
     /**
      * Begins a transactional context.
      *
@@ -1260,8 +1290,9 @@
         catch (Throwable ioobe)
         {
             abortTran(context);
+
             throw new AMQException("Invalid database format. Please use upgrade tool for store in Virtualhost:'"
-                                   + _virtualHost.getName() + "'", ioobe);
+                                   + _virtualHost.getName() + "'", ioobe.getCause() != null ? ioobe.getCause() : ioobe);
         }
 
         stateTransition(State.RECOVERING, State.STARTED);
@@ -1599,31 +1630,11 @@
         }
     }
 
-    private static final class ProcessAction
-    {
-        private final AMQQueue _queue;
-        private final StoreContext _context;
-        private final AMQMessage _message;
 
-        public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
-        {
-            _queue = queue;
-            _context = context;
-            _message = message;
-        }
-
-        public void process() throws AMQException
-        {
-            _queue.enqueue(_context, _message);
-        }
-
-    }
-
     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
             throws DatabaseException, AMQException
     {
         Map<Long, PersistentAMQMessage> msgMap = new HashMap<Long, PersistentAMQMessage>();
-        List<ProcessAction> actions = new ArrayList<ProcessAction>();
 
         Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
 
@@ -1660,13 +1671,8 @@
                 maxId = Math.max(maxId, messageId);
                 PersistentAMQMessage message = msgMap.get(messageId);
 
-                if (message != null)
+                if (message == null)
                 {
-                    message.incrementReference();
-                }
-                else
-                {
-
                     message = (PersistentAMQMessage) _messageFactory.createMessage(messageId, this);
                     msgMap.put(messageId, message);
 
@@ -1682,7 +1688,6 @@
                         ContentChunk cc = getContentBodyChunk(context, messageId, index);
                         message.recoverContentBodyFrame(cc, index == count);
                     }
-
                 }
 
                 if (_log.isDebugEnabled())
@@ -1702,15 +1707,11 @@
 
                 }
 
-                actions.add(new ProcessAction(queue, context, message));
+                recordEnqueue(messageId, queue);
+                queue.enqueue(context, message);
 
             }
 
-            for (ProcessAction action : actions)
-            {
-                action.process();
-            }
-
             _messageId.set(maxId + 1);
         }
         catch (DatabaseException e)
@@ -1753,9 +1754,21 @@
     {
         // _log.debug("void commit(Transaction tx = " + tx + "): called");
 
+        if (tx == null)
+        {
+            throw new DatabaseException("Fatal internal error: transactional context is empty at commitTran");
+        }
+
         tx.commitNoSync();
 
-        Commit commit = new Commit(_commitThread, tx);
+        Map<Long, List<AMQQueue>> dequeueMap = null;
+        synchronized (_dequeueTxMap)
+        {
+            dequeueMap = _dequeueTxMap.remove(tx);
+        }
+
+        Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+
         commit.commit();
 
     }
@@ -1773,14 +1786,19 @@
         private final Transaction _tx;
         private DatabaseException _databaseException;
         private boolean _complete;
+        private Map<Long, List<AMQQueue>> _messageDequeueMap;
+        private TransactionLog _transactionLog;
 
-        public Commit(CommitThread commitThread, Transaction tx)
+        public Commit(CommitThread commitThread, Transaction tx, Map<Long,
+                List<AMQQueue>> messageDequeueMap, TransactionLog transactionLog)
         {
             // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
             // + "): called");
 
             _commitThread = commitThread;
             _tx = tx;
+            _messageDequeueMap = messageDequeueMap;
+            _transactionLog = transactionLog;
         }
 
         public void prepare(boolean synch) throws DatabaseException
@@ -1813,7 +1831,24 @@
 
             _complete = true;
 
+            // If we have dequeuedMessages so update our internal state
+            if (_messageDequeueMap != null)
+            {
+                _log.info("Transaction(" + _tx + ") Complete : Dequeuing messages used.");
+                StoreContext dequeueMessageContext = new StoreContext();
+
+                for (Map.Entry<Long, List<AMQQueue>> entry : _messageDequeueMap.entrySet())
+                {
+                    Long id = entry.getKey();
+                    for (AMQQueue queue : entry.getValue())
+                    {
+                        ((BDBMessageStore) _transactionLog).recordDequeue(dequeueMessageContext, id, queue);
+                    }
+                }
+            }
+
             notify();
+
         }
 
         public synchronized void abort(DatabaseException databaseException)
@@ -1858,6 +1893,72 @@
     }
 
     /**
+     * Record that the give message is enqueued on the specified queue.
+     *
+     * @param messageId The message id to enqueue
+     * @param queue     The queue it is enqueued upon.
+     */
+    private void recordEnqueue(Long messageId, AMQQueue queue)
+    {
+        List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+        if (queues == null)
+        {
+            queues = new LinkedList<AMQQueue>();
+        }
+
+        queues.add(queue);
+
+        _messageOnQueueMap.put(messageId, queues);
+    }
+
+    /**
+     * Update our records that the given message is nolonger on the specified queue.
+     * If the message no longer has any queue references then we can discard the content.
+     *
+     * @param context
+     * @param messageId
+     * @param queue
+     */
+    private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
+    {
+        _log.info("Dequeue Message(" + messageId + ") from queue(" + queue.getName() + ") context=" + context);
+        List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+        if (queues == null)
+        {
+            throw new RuntimeException("Error, Tried to dequeue a message that is not enqueued");
+        }
+
+        if (queues.remove(queue))
+        {
+            // if we now have no more references to this message we can dispose of it
+            if (queues.size() == 0)
+            {
+                try
+                {
+                    _messageOnQueueMap.remove(messageId);
+                    _log.info("Removing Message(" + messageId + ") from Tlog context=" + context);
+
+                    removeMessage(context, messageId);
+                }
+                catch (AMQException e)
+                {
+                    //todo As we are jus swallowing exception need to add clean up in recover().
+                    // This should purge any message content that doesn't have any delivery records.
+                    _log.debug("Error occured removing unreferenced message:" + e.getMessage());
+                }
+
+            }
+        }
+        else
+        {
+            throw new RuntimeException("Error, Tried to dequeue a message from a queue, upon which it is not enqueued");
+        }
+
+    }
+
+    /**
      * Implements a thread which batches and commits a queue of {@link Commit} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations when they have been

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -20,33 +20,52 @@
 import com.sleepycat.je.DatabaseException;
 import junit.framework.Assert;
 import junit.framework.TestSuite;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
 import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -54,7 +73,7 @@
 {
     private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
 
-    private BDBMessageStore _store;
+    private BDBMessageStore _transactionLog;
     private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
 
     private StoreContext _storeContext = new StoreContext();
@@ -85,13 +104,13 @@
         deleteDirectory(bdbDir);
         BDB_DIR.mkdirs();
 
-        _store = new BDBMessageStore();
-        _store.configure(BDB_DIR);
+        _transactionLog = new InspectableBDBMessageStore();
+        _transactionLog.configure(BDB_DIR);
 
-        _virtualHost = new VirtualHost("test", _store);
-        _store.setVirtualHost(_virtualHost);
+        _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new PropertiesConfiguration()), _transactionLog);
+        _transactionLog.setVirtualHost(_virtualHost);
 
-        _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
+        _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
     private void reload() throws Exception
@@ -101,10 +120,10 @@
         PropertiesConfiguration env = new PropertiesConfiguration();
 
         env.addProperty("store.environment-path", STORE_LOCATION);
-        env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+        env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
 
-        _virtualHost = new VirtualHost("test", env);
-        _store = (BDBMessageStore) _virtualHost.getTransactionLog();
+        _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+        _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
     }
 
     public void tearDown() throws Exception
@@ -151,13 +170,13 @@
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
 
-        _store.createQueue(queue, queueArguments);
+        _transactionLog.createQueue(queue, queueArguments);
 
         AMQShortString routingKey = new AMQShortString("Test-Key");
         FieldTable bindArguments = new FieldTable();
         bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
 
-        _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+        _transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
 
         reload();
 
@@ -177,7 +196,7 @@
 
     private MessagePublishInfo createPublishBody()
     {
-        return new MessagePublishInfoImpl(MYEXCHANGE,false,true,RK);
+        return new MessagePublishInfoImpl(MYEXCHANGE, false, true, RK);
     }
 
     private BasicContentHeaderProperties createContentHeaderProperties()
@@ -230,10 +249,10 @@
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
 
-        _store.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
-        _store.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
+        _transactionLog.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
+        _transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
 
-        MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
+        MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
         MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
         Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
         Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -248,7 +267,7 @@
         Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
         Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
         Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
-        ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L, 0);
+        ContentChunk returnedContentBody = _transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
         ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
         byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
         returnedPayloadAsBytes.get(returnedPayload);
@@ -264,15 +283,15 @@
         ContentChunk body = createContentChunk(bodyText);
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-        _store.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
-        _store.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
-        _store.getContentBodyChunk(_storeContext, 15L, 0);
-        _store.removeMessage(_storeContext, 15L);
+        _transactionLog.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
+        _transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
+        _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+        _transactionLog.removeMessage(_storeContext, 15L);
 
         // the next line should throw since the message id should not be found
         try
         {
-            _store.getMessageMetaData(_storeContext, 15L);
+            _transactionLog.getMessageMetaData(_storeContext, 15L);
             Assert.fail("No exception thrown when message id not found getting metadata");
         }
         catch (AMQException e)
@@ -282,7 +301,7 @@
 
         try
         {
-            _store.getContentBodyChunk(_storeContext, 15L, 0);
+            _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
             Assert.fail("No exception thrown when message id not found getting content chunk");
         }
         catch (AMQException e)
@@ -300,19 +319,19 @@
         ContentChunk body = createContentChunk(bodyText);
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-        _store.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _transactionLog.createQueue(queue);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 20L);
-        _store.enqueueMessage(_storeContext, queue, 21L);
-        _store.commitTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 20L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+        _transactionLog.commitTran(_storeContext);
 
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
@@ -323,7 +342,7 @@
 
     public void testTranRollback1() throws Exception
     {
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -333,26 +352,26 @@
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
 
-        _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _transactionLog.createQueue(queue);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 30L);
-        _store.enqueueMessage(_storeContext, queue, 31L);
-        _store.commitTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+        _transactionLog.commitTran(_storeContext);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 32L);
-        _store.abortTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+        _transactionLog.abortTran(_storeContext);
 
-        _store.beginTran(_storeContext);
-        _store.commitTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.commitTran(_storeContext);
 
-        enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
         assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
@@ -364,7 +383,7 @@
 
     public void testTranRollback2() throws Exception
     {
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -374,23 +393,23 @@
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
 
-        _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _transactionLog.createQueue(queue);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 30L);
-        _store.abortTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+        _transactionLog.abortTran(_storeContext);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 31L);
-        _store.enqueueMessage(_storeContext, queue, 32L);
-        _store.commitTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+        _transactionLog.commitTran(_storeContext);
 
-        enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
@@ -400,7 +419,7 @@
 
     public void testRecovery() throws Exception
     {
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -410,25 +429,25 @@
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
 
-        _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
-        _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 1));
+        _transactionLog.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 1));
 
-        _store.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(), true);
+        _transactionLog.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(), true);
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 
-        _store.createQueue(queue);
-        _store.createQueue(queue2);
+        _transactionLog.createQueue(queue);
+        _transactionLog.createQueue(queue2);
 
-        _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, queue, 40L);
-        _store.enqueueMessage(_storeContext, queue, 41L);
-        _store.enqueueMessage(_storeContext, queue2, 42L);
-        _store.commitTran(_storeContext);
+        _transactionLog.beginTran(_storeContext);
+        _transactionLog.enqueueMessage(_storeContext, queue, 40L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 41L);
+        _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+        _transactionLog.commitTran(_storeContext);
 
-        _store.enqueueMessage(_storeContext, queue, 42L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 42L);
 
         reload();
 
@@ -459,23 +478,23 @@
 
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
 
-        _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
+        _transactionLog.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _transactionLog.createQueue(queue);
 
-        _store.enqueueMessage(_storeContext, queue, 50L);
-        _store.dequeueMessage(_storeContext, queue, 50L);
+        _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+        _transactionLog.dequeueMessage(_storeContext, queue, 50L);
     }
 
     public void testQueueRemove() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
-        _store.removeQueue(queue);
+        _transactionLog.createQueue(queue);
+        _transactionLog.removeQueue(queue);
         try
         {
-            _store.removeQueue(queue);
+            _transactionLog.removeQueue(queue);
             Assert.fail("No exception thrown when deleting non-existant queue");
         }
         catch (AMQException e)

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -24,9 +24,13 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
 import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.util.FileUtils;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,9 +46,9 @@
 {
     protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
 
-    final String BDBHome = System.getProperty("BDB_HOME");
-    final File _configFile = new File(BDBHome, "etc/config.xml");
-
+    protected final String QpidHome = System.getProperty("QPID_HOME");
+    protected final File _configFile = new File(QpidHome, "etc/persistent_config.xml");
+    
     private String VIRTUALHOST = "test";
 
     private static final String VERSION_1 = "1";
@@ -113,57 +117,69 @@
         }
     }
 
+    public void testDurabability() throws Exception
+    {
+        String broker = "vm://:1";
+
+        startBroker(1, VERSION_1);
+
+        sendAndCheckDurableSubscriber(broker, true, false, 10, null);
+
+        stopBroker(1);
+
+        startBroker(1, VERSION_1);
+
+        sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+
+        stopBroker(1);
+    }
+
+    /**
+     * This test is currently broken due to QPID-1275
+     * @throws Exception
+     */
     public void testDurababilitySelectors() throws Exception
     {
         String broker = "vm://:1";
 
+        System.err.println("Start V1 Broker");
         startBroker(1, VERSION_1);
 
         new DurableSubscriber(broker, _topic, null).close();
 
+        System.err.println("Send 10 message to topic:"+_topic);
         sendMessages(broker, _topic, 10);
 
+        //don't actually send any messages just check we get 5 odd messages
         sendAndCheckDurableSubscriber(broker, false, true, 5, "odd=true");
 
         Thread.sleep(2000);
 
         stopBroker(1);
 
+        System.err.println("Upgrade Broker");
         upgradeBroker();
 
-        broker = "vm://:2";
+        System.err.println("Start V2 Broker");
+        startBroker(1, VERSION_2);
 
-        startBroker(2, VERSION_2);
-
+        System.err.println("Ensure we can receive all the messagse messages but don't ack them");
         //Ensure msg were transitioned to new broker
+        //don't actually send any messages just check we get 5 odd messages        
         sendAndCheckDurableSubscriber(broker, false, false, 5, null);
 
-        //Reset the Selector Pattern
+        System.err.println("Change selector so on recovery the messages are dropped");
+        //Reset the Selector Pattern this should recreate the queue 
         new DurableSubscriber(broker, _topic, "odd=true").close();
 
-        stopBroker(2);
-
-        startBroker(2, VERSION_2);
-
-        //Ensure that the selector was preseved on restart and caused all msgs to be removed.
-        sendAndCheckDurableSubscriber(broker, false, false, 0, null);
-        stopBroker(2);
-    }
-
-    public void testDurabability() throws Exception
-    {
-        String broker = "vm://:1";
-
-        startBroker(1, VERSION_1);
-
-        sendAndCheckDurableSubscriber(broker, true, false, 10, null);
-
         stopBroker(1);
 
-        startBroker(1, VERSION_1);
+        System.err.println("Start V2 Broker");
 
-        sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+        startBroker(1, VERSION_2);
 
+        //Ensure that the selector was preseved on restart and caused all msgs to be removed.
+        sendAndCheckDurableSubscriber(broker, false, true, 0, null);
         stopBroker(1);
     }
 
@@ -184,16 +200,24 @@
         ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile);
 
         //Disable management on broker.
-        config.getConfiguration().setProperty("management.enabled", "false");
+        config.getConfiguration().setManagementEnabled(false);
+                
+        //There can be only one MF so we need to reset it here or recovery will not work
+        MessageFactory.getInstance().reset();
+        //todo need to change MF so it is per AppRegistry not a singleton
 
-        Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+        ApplicationRegistry.initialise(config, port);
+
+        Configuration testVirtualhost = new PropertiesConfiguration();
         testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-        testVirtualhost.setProperty("store." + BDBMessageStore.ENVIRONMENT_PATH_PROPERTY, "${work}/version" + port + "Store");
+        testVirtualhost.setProperty("store." + BDBMessageStore.ENVIRONMENT_PATH_PROPERTY,
+                                    System.getProperty("QPID_WORK")+ "/version" + version + "Store");
         testVirtualhost.setProperty("store.version", version);
 
-        ApplicationRegistry.initialise(config, port);
+        ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
+                registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
+
         TransportConnection.createVMBroker(port);
-
     }
 
     public void sendAndCheckDurableSubscriber(String broker, boolean send, boolean commitRecieved, int count, String selector)

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.TestTransactionLog;
+
+import java.util.List;
+
+public class InspectableBDBMessageStore extends BDBMessageStore implements TestTransactionLog
+{
+    public List<AMQQueue> getMessageReferenceMap(Long messageId)
+    {
+        return _messageOnQueueMap.get(messageId);
+    }
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTest extends BDBVMTestCase
+{
+
+    private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
+
+    protected InspectableBDBMessageStore _transactionLog;
+    private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
+    protected VirtualHost _virtualHost;
+
+    protected SimpleAMQQueue _queue1, _queue2;
+    protected AMQShortString _q1name = new AMQShortString("q1name");
+    protected AMQShortString _q2name = new AMQShortString("q2name");
+    protected AMQShortString _owner = new AMQShortString("owner");
+    protected AMQShortString _routingKey = new AMQShortString("routing key");
+    protected NonTransactionalContext _messageDeliveryContext;
+
+    protected static final long MESSAGE_SIZE = 0L;
+
+    File BDB_DIR = new File(STORE_LOCATION);
+
+    protected boolean _transactional;
+    protected boolean _ack;
+
+    public void setUp() throws Exception
+    {
+        if (BDB_DIR.exists())
+        {
+            deleteDirectory(BDB_DIR);
+        }
+
+        ApplicationRegistry.initialise(new NullApplicationRegistry());
+        ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
+
+        File bdbDir = new File(STORE_LOCATION);
+        deleteDirectory(bdbDir);
+        BDB_DIR.mkdirs();
+
+        _transactionLog = new InspectableBDBMessageStore();
+        _transactionLog.configure(BDB_DIR);
+
+        _virtualHost = new VirtualHost(new VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()), _transactionLog);
+        _transactionLog.setVirtualHost(_virtualHost);
+
+        applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+
+        _queue1 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, _virtualHost, null);
+        _queue2 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q2name, false, _owner, false, _virtualHost, null);
+        // Create IncomingMessage and nondurable queue
+        _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
+
+        _transactional = false;
+        _ack = true;
+    }
+
+    protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+    {
+        IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+                                                  new MockProtocolSession(_transactionLog), _transactionLog);
+
+        // equivalent to amqChannel.publishContenHeader
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+        // This message has no bodies
+        contentHeaderBody.bodySize = MESSAGE_SIZE;
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+        msg.setContentHeaderBody(contentHeaderBody);
+        msg.setExpiration();
+
+        return msg;
+    }
+
+
+    /**
+     * Tests delivery of a single persistent message to a queue. Validate through reference counting that the
+     * Message is removed from the TransactionLog when reference count has been used have been removed.
+     *
+     * @throws Exception
+     */
+    public void testSingleConsume() throws Exception
+    {
+        MessagePublishInfo info = new MessagePublishInfoImpl();
+        IncomingMessage msg = createMessage(info);
+
+        // Send persistent message
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue1);
+
+        // equivalent to amqChannel.routeMessage()
+        msg.enqueue(qs);
+
+        msg.routingComplete(_transactionLog);
+
+        // equivalent to amqChannel.deliverCurrentMessageIfComplete
+        msg.deliverToQueues();
+
+        // Check that data has been stored to disk
+        long messageId = msg.getMessageId();
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+        // Create consumer to correctly consume message
+        AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+        AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+        session1.addChannel(channel1);
+
+        if (_transactional && _ack)
+        {
+            channel1.setLocalTransactional();
+        }
+
+        Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag1"), _ack,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+
+        _queue1.registerSubscription(sub1, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(200);
+        Thread.yield();
+
+        if (_ack)
+        {
+            // Acknowledge the message
+            channel1.acknowledgeMessage(1, false);
+
+            if (_transactional)
+            {
+                channel1.commit();
+            }
+        }
+        // Check that it is now dequeued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+        checkMessageMetaDataRemoved(messageId);
+    }
+
+    /**
+     * Tests delivery of a single persistent message to two queues so validate through reference counting that the
+     * Message is removed from the TransactionLog when all references have been removed.
+     *
+     * @throws Exception
+     */
+    public void testTwoQueues() throws Exception
+    {
+        MessagePublishInfo info = new MessagePublishInfoImpl();
+        IncomingMessage msg = createMessage(info);
+
+        // Send persistent message
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue1);
+        qs.add(_queue2);
+
+        // equivalent to amqChannel.routeMessage()
+        msg.enqueue(qs);
+
+        msg.routingComplete(_transactionLog);
+
+        // equivalent to amqChannel.deliverCurrentMessageIfComplete
+        msg.deliverToQueues();
+
+        // Check that data has been stored to disk
+        long messageId = msg.getMessageId();
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        // Create consumer to correctly consume message
+        AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+        AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+        session1.addChannel(channel1);
+
+        if (_transactional & _ack)
+        {
+            channel1.setLocalTransactional();
+        }
+
+        Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag1"), _ack,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+        _queue1.registerSubscription(sub1, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(200);
+        Thread.yield();
+
+        if (_ack)
+        {
+            // Acknowledge the message
+            channel1.acknowledgeMessage(1, false);
+
+            if (_transactional)
+            {
+                channel1.commit();
+            }
+        }
+        // Check that data still exists
+        checkMessageMetaDataExists(messageId);
+
+        // Check that message was consumed
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message Should only be enqueued on one queue.", 1, queueList.size());
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag2"), _ack,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+
+        //Consume from second queue
+        _queue2.registerSubscription(sub2, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(200);
+        Thread.yield();
+
+        if (_ack)
+        {
+            // Acknowledge the message
+            channel1.acknowledgeMessage(2, false);
+
+            if (_transactional)
+            {
+                channel1.commit();
+            }
+        }
+        // Check that it is now dequeued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+        checkMessageMetaDataRemoved(messageId);
+    }
+
+    protected void checkMessageMetaDataExists(long messageId)
+    {
+        try
+        {
+            _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+        }
+        catch (AMQException amqe)
+        {
+            fail("Message MetaData does not exist for message:" + messageId);
+        }
+    }
+
+    protected void checkMessageMetaDataRemoved(long messageId)
+    {
+        try
+        {
+            _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+            fail("Message MetaData still exists for message:" + messageId);
+        }
+        catch (AMQException amqe)
+        {
+            assertEquals("Metadata not found for message with id " + messageId, amqe.getMessage());
+        }
+    }
+
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTransactionalTest extends MessagePersistenceTest
+{
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _transactional = true;
+    }
+
+    /**
+     * Tests delivery of a single persistent message to two queues so validate through reference counting that the
+     * Message is removed from the TransactionLog when all references have been removed.
+     *
+     * @throws Exception
+     */
+    public void testAckTwoQueuesInOneTx() throws Exception
+    {
+        MessagePublishInfo info = new MessagePublishInfoImpl();
+        IncomingMessage msg = createMessage(info);
+
+        // Send persistent message
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue1);
+        qs.add(_queue2);
+
+        // equivalent to amqChannel.routeMessage()
+        msg.enqueue(qs);
+
+        msg.routingComplete(_transactionLog);
+
+        // equivalent to amqChannel.deliverCurrentMessageIfComplete
+        msg.deliverToQueues();
+
+        // Check that data has been stored to disk
+        long messageId = msg.getMessageId();
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        // Create consumer to correctly consume message
+        AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+        AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+        session1.addChannel(channel1);
+
+        channel1.setLocalTransactional();
+
+        Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag1"), true,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+        _queue1.registerSubscription(sub1, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(250);
+        Thread.yield();
+
+        // Acknowledge the message
+        channel1.acknowledgeMessage(1, false);
+
+        // Check that data still exists
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag2"), true,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+
+        //Consume from second queue
+        _queue2.registerSubscription(sub2, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(250);
+        Thread.yield();
+
+        // Acknowledge the message
+        channel1.acknowledgeMessage(2, false);
+
+        // Check that it is enqueued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        if (_transactional)
+        {
+            channel1.commit();
+        }
+
+        // Check that it is now dequeued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+        checkMessageMetaDataRemoved(messageId);
+    }
+
+    public void testEnqueueOnTwoQueuesAckWithRollback() throws Exception
+    {
+        MessagePublishInfo info = new MessagePublishInfoImpl();
+        IncomingMessage msg = createMessage(info);
+
+        // Send persistent message
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue1);
+        qs.add(_queue2);
+
+        // equivalent to amqChannel.routeMessage()
+        msg.enqueue(qs);
+
+        msg.routingComplete(_transactionLog);
+
+        // equivalent to amqChannel.deliverCurrentMessageIfComplete
+        msg.deliverToQueues();
+
+        // Check that data has been stored to disk
+        long messageId = msg.getMessageId();
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        // Create consumer to correctly consume message
+        AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+        AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+        session1.addChannel(channel1);
+
+        channel1.setLocalTransactional();
+
+        Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+                                                                                new AMQShortString("cTag1"), true,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+        _queue1.registerSubscription(sub1, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(250);
+        Thread.yield();
+
+        // Acknowledge the message
+        channel1.acknowledgeMessage(1, false);
+
+        // Check that data still exists
+        checkMessageMetaDataExists(messageId);
+
+        // Check that it is enqueued
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+        AMQProtocolSession session2 = new MockProtocolSession(_transactionLog);
+        AMQChannel channel2 = new AMQChannel(session2, 1, _transactionLog);
+        session2.addChannel(channel2);
+
+        Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session2,
+                                                                                new AMQShortString("cTag2"), true,
+                                                                                null, false,
+                                                                                new LimitlessCreditManager());
+
+        //Consume from second queue
+        _queue2.registerSubscription(sub2, true);
+
+        // Give the delivery thread time to deliver the message
+        Thread.sleep(250);
+        Thread.yield();
+
+        // Acknowledge the message
+        channel2.acknowledgeMessage(1, false);
+
+        // Check that it is still enqueued for queue1
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+        channel1.rollback();
+
+        queueList = _transactionLog.getMessageReferenceMap(messageId);
+        assertNotNull(queueList);
+        assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+        checkMessageMetaDataExists(messageId);
+    }
+
+}

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+public class MessagePersistentNoAckTest extends MessagePersistenceTest
+{
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _ack = false;
+    }
+}

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -30,6 +30,9 @@
 
 import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
 
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the required properties for this.
+ */
 public class MessageReSendTest extends BDBVMTestCase
 {
     protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -37,6 +37,10 @@
 import javax.naming.NamingException;
 import java.io.File;
 
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the required properties for this.
+ */
+
 public class QueueDeleteWhilstRoutingTest extends BDBVMTestCase
 {
     private static final Logger _logger = Logger.getLogger(QueueDeleteWhilstRoutingTest.class);

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -37,7 +37,7 @@
 
     public JNDIHelper(String broker) 
     {
-        CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
+        CONNECTION_NAME = "amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker + "'";
         setupJNDI();
     }
 

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2009-02-20 13:34:27 UTC (rev 3125)
@@ -26,6 +26,7 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.util.FileUtils;
 import org.apache.commons.cli.PosixParser;
@@ -34,6 +35,7 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.io.File;
 import java.io.BufferedReader;
@@ -327,8 +329,8 @@
         }
 
         // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
-        _newVirtualHost = new VirtualHost("Upgraded", new MemoryMessageStore());
-        _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
+        _newVirtualHost = new VirtualHost(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
+        _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
 
         //Create a new messageStore
         _newMessageStore = new BDBMessageStore();




More information about the rhmessaging-commits mailing list