[rhmessaging-commits] rhmessaging commits: r3971 - in store/trunk/java/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/keys and 4 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri May 14 12:24:01 EDT 2010


Author: rgemmell
Date: 2010-05-14 12:23:59 -0400 (Fri, 14 May 2010)
New Revision: 3971

Added:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
Removed:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial update to implement the new RecoveryHandler store behaviour, plus other updates required since 0-10 support was added to the trunk broker.


Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,53 +20,66 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.sleepycat.je.*;
-
 import org.apache.commons.configuration.Configuration;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.AbstractMessageStore;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
 
 /**
  * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -76,62 +89,40 @@
  * exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
  * dequeue messages to queues. <tr><td> Generate message identifiers. </table>
  */
-public class BDBMessageStore extends AbstractMessageStore
+public class BDBMessageStore implements MessageStore
 {
     private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
-    private static final int DATABASE_FORMAT_VERSION = 2;
+    private static final int DATABASE_FORMAT_VERSION = 3;
     private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
-
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
 
     private Environment _environment;
 
     private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
-
-    /**
-     * Maps from messageId to an AMQMessage (note we don't use serialisation but this is what it roughly corresponds
-     * to)
-     */
+    private String MESSAGECONTENTDB_NAME = "messageContentDb";
+    private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+    private String DELIVERYDB_NAME = "deliveryDb";
+    private String EXCHANGEDB_NAME = "exchangeDb";
+    private String QUEUEDB_NAME = "queueDb";
     private Database _messageMetaDataDb;
-
-    private String MESSAGECONTENTDB_NAME = "messageContentDb";
-
     private Database _messageContentDb;
-
-    private String QUEUEDB_NAME = "queueDb";
-
-    /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
-    private Database _queueDb;
-
-    private String DELIVERYDB_NAME = "deliveryDb";
-
-    /** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
+    private Database _queueBindingsDb;
     private Database _deliveryDb;
-
-    private String EXCHANGEDB_NAME = "exchangeDb";
     private Database _exchangeDb;
+    private Database _queueDb;
 
-    private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
-    private Database _queueBindingsDb;
+    private LogSubject _logSubject;
 
-    private VirtualHost _virtualHost;
+    private final AtomicLong _messageId = new AtomicLong(0);
 
-    private final AtomicLong _messageId = new AtomicLong(1);
-
-    private final AtomicLong _queueId = new AtomicLong(1);
-
     private final CommitThread _commitThread = new CommitThread("Commit-Thread");
 
-    private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
-
-    // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
-
+    // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
+    private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
     private QueueTupleBindingFactory _queueTupleBindingFactory;
     private BindingTupleBindingFactory _bindingTupleBindingFactory;
 
-    Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
-
     /** The data version this store should run with */
     private int _version;
     private enum State
@@ -149,6 +140,9 @@
 
     private TransactionConfig _transactionConfig = new TransactionConfig();
 
+    private boolean _configured;
+
+    
     public BDBMessageStore()
     {
         this(DATABASE_FORMAT_VERSION);
@@ -176,23 +170,78 @@
             QUEUEBINDINGSDB_NAME += "_v" + version;
         }
     }
+ 
+    public void configureConfigStore(String name, 
+                                     ConfigurationRecoveryHandler recoveryHandler, 
+                                     Configuration storeConfiguration,
+                                     LogSubject logSubject) throws Exception
+    {
+        _logSubject = logSubject;
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
 
+        if(_configured)
+        {
+            throw new Exception("ConfigStore already configured");
+        }
+
+        configure(name,storeConfiguration);
+        
+        _configured = true;
+        stateTransition(State.CONFIGURING, State.CONFIGURED);
+        
+        recover(recoveryHandler);
+        stateTransition(State.RECOVERING, State.STARTED);
+    }
+
+    public void configureMessageStore(String name,
+                                      MessageStoreRecoveryHandler recoveryHandler,
+                                      Configuration storeConfiguration,
+                                      LogSubject logSubject) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName()));
+
+        if(!_configured)
+        {
+            throw new Exception("ConfigStore not configured");
+        }
+
+        recoverMessages(recoveryHandler);
+    }
+
+    public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
+            Configuration storeConfiguration, LogSubject logSubject) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName()));
+
+        if(!_configured)
+        {
+            throw new Exception("ConfigStore not configured");
+        }
+
+        recoverQueueEntries(recoveryHandler);
+
+        
+    }
+
+    public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+    {
+        return new BDBTransaction();
+    }
+
+    
     /**
-     * Called after instantiation in order to configure the message store. A particular implementation can define
-     * whatever parameters it wants.
+     * Called after instantiation in order to configure the message store.
      *
-     * @param virtualHost The virtual host using by this store
-     * @param base        Not used
+     * @param name The name of the virtual host using this store
      * @param vHostConfig The configuration for this virtualhost
+     * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
+    public boolean configure(String name, Configuration storeConfig) throws Exception
     {
-        super.configure(virtualHost, base, vHostConfig);
-
-        Configuration config = vHostConfig.getStoreConfiguration();
-        File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + "/bdbstore/" + virtualHost.getName()));        
+        File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, 
+                                System.getProperty("QPID_WORK") + "/bdbstore/" + name));        
         if (!environmentPath.exists())
         {
             if (!environmentPath.mkdirs())
@@ -204,53 +253,39 @@
 
         CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
 
-        _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+        _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
 
-        configure(virtualHost, environmentPath, false);
+        return configure(environmentPath, false);
     }
 
-    public void configure(File environmentPath) throws AMQException, DatabaseException
+    
+    /**
+     * 
+     * @param environmentPath location for the store to be created in/recovered from
+     * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists 
+     * @return whether or not a new store environment was created
+     * @throws AMQException
+     * @throws DatabaseException
+     */
+    protected boolean configure(File environmentPath, boolean readonly) throws AMQException, DatabaseException
     {
-        configure(null, environmentPath, false);
-    }
-
-    public void configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
-    {
         stateTransition(State.INITIAL, State.CONFIGURING);
 
         _log.info("Configuring BDB message store");
 
+        createTupleBindingFactories(_version);
+        
         setDatabaseNames(_version);
 
-        if (virtualHost != null)
-        {
-            setVirtualHost(virtualHost);
-        }
-
-        boolean newEnvironment = setupStore(environmentPath, readonly);
-
-        // Performing recovery when we only want read access will cause all the broker objects to be recreated
-        // This will/may include thread pool creations that may be duplicated when manually inspecting the state of
-        // the store. Simplest solution is to prevent the initial creation of the state by blocking recovery.
-        if (!readonly)
-        {
-            stateTransition(State.CONFIGURING, State.CONFIGURED);
-
-            //If we have loaded an environment and have virtualHost configured then recover environment
-            if (!newEnvironment && virtualHost != null)
-            {
-                // this recovers durable queues and persistent messages
-                recover(virtualHost);
-            }
-        }
-
-        //if we have a new environment there we can jump to started as there is no recovery requried..
-        if (newEnvironment) // && !readonly is implied as you cant get a newEnviroment in readonly mode.
-        {
-            stateTransition(State.CONFIGURED, State.STARTED);
-        }
-
+        return setupStore(environmentPath, readonly);
     }
+    
+    private void createTupleBindingFactories(int version)
+    {
+            _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
+            _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
+            _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
+    }
 
     /**
      * Move the store state from CONFIGURING to STARTED.
@@ -288,7 +323,8 @@
         {
             int versionIndex = s.indexOf("_v");
 
-            // DB is v1 if _version is not v1 then error
+            // lack of _v index suggests DB is v1
+            // so if _version is not v1 then error
             if (versionIndex == -1)
             {
                 if (_version != 1)
@@ -315,12 +351,6 @@
         }
     }
 
-    private void createTupleBindingFactories(int version)
-    {
-        _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
-        _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
-    }
-
     private synchronized void stateTransition(State requiredState, State newState) throws AMQException
     {
         if (_state != requiredState)
@@ -344,16 +374,15 @@
     {
         _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
         EnvironmentConfig envConfig = new EnvironmentConfig();
-        // This is what allos the creation of the store if it does not already exist.
+        // This is what allows the creation of the store if it does not already exist.
         envConfig.setAllowCreate(false);
         envConfig.setTransactional(true);
         envConfig.setConfigParam("je.lock.nLockTables", "7");
 
-	// Restore 500,000 default timeout.	
+        // Restore 500,000 default timeout.	
         //envConfig.setLockTimeout(15000);
 
         // Added to help diagnosis of Deadlock issue
-        //
         // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
         if (Boolean.getBoolean("qpid.bdb.lock.debug"))
         {
@@ -467,8 +496,8 @@
         closeEnvironment();
 
         _state = State.CLOSED;
-
-        super.close();
+        
+        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED());
     }
 
     private void closeEnvironment() throws DatabaseException
@@ -483,43 +512,275 @@
         }
     }
 
+    
+    public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+    {
+        stateTransition(State.CONFIGURED, State.RECOVERING);
+
+        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START());
+
+        try
+        {
+            QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+            loadQueues(qrh);
+
+            ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+            loadExchanges(erh);
+            
+            BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+            recoverBindings(brh);
+            
+            brh.completeBindingRecovery();
+        }
+        catch (DatabaseException e)
+        {
+            throw new AMQException("Error recovering persistent state: " + e, e);
+        }
+
+    }
+
+    private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
+    {
+        Cursor cursor = null;
+
+        try
+        {
+            cursor = _queueDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            TupleBinding binding = _queueTupleBindingFactory.getInstance();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+                
+                String queueName = queueRecord.getNameShortString() == null ? null : 
+                                        queueRecord.getNameShortString().asString();
+                String owner = queueRecord.getOwner() == null ? null : 
+                                        queueRecord.getOwner().asString();
+                FieldTable arguments = queueRecord.getArguments();
+
+                qrh.queue(queueName, owner, arguments);
+            }
+
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+    }
+    
+    
+    private void loadExchanges(ExchangeRecoveryHandler erh) throws AMQException, DatabaseException
+    {
+        Cursor cursor = null;
+
+        try
+        {
+            cursor = _exchangeDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            TupleBinding binding = new ExchangeTB();
+            
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
+
+                String exchangeName = exchangeRec.getNameShortString() == null ? null : 
+                                      exchangeRec.getNameShortString().asString();
+                String type = exchangeRec.getType() == null ? null : 
+                              exchangeRec.getType().asString();
+                boolean autoDelete = exchangeRec.isAutoDelete();
+                
+                erh.exchange(exchangeName, type, autoDelete);
+            }
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+
+    }
+    
+    private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = _queueBindingsDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            TupleBinding binding = _bindingTupleBindingFactory.getInstance();
+            
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                //yes, this is retrieving all the useful information from the key only.
+                //For table compatibility it shall currently be left as is
+                BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
+                
+                String exchangeName = bindingRecord.getExchangeName() == null ? null :
+                                      bindingRecord.getExchangeName().asString();
+                String queueName = bindingRecord.getQueueName() == null ? null :
+                                   bindingRecord.getQueueName().asString();
+                String routingKey = bindingRecord.getRoutingKey() == null ? null :
+                                    bindingRecord.getRoutingKey().asString();
+                ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null : 
+                    java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
+                
+                brh.binding(exchangeName, queueName, routingKey, argumentsBB);
+            }
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+
+    }
+
+    private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
+    {
+        StoredMessageRecoveryHandler mrh = msrh.begin();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = _messageMetaDataDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
+
+            DatabaseEntry value = new DatabaseEntry();
+            EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+
+            long maxId = 0;
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                long messageId = (Long) keyBinding.entryToObject(key);
+                StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
+
+                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+                mrh.message(message);
+                
+                maxId = Math.max(maxId, messageId);
+            }
+
+            _messageId.set(maxId);
+        }
+        catch (DatabaseException e)
+        {
+            _log.error("Database Error: " + e, e);
+            throw e;
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+    }
+    
+    private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) 
+    throws DatabaseException, AMQException
+    {
+        QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = _deliveryDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            EntryBinding keyBinding = new QueueEntryTB();
+
+            DatabaseEntry value = new DatabaseEntry();
+            EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+
+                QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+                AMQShortString queueName = dd.getQueueName();
+                long messageId = dd.getMessageId();
+                
+                qerh.queueEntry(queueName.asString(),messageId);
+
+//                if (_log.isDebugEnabled())
+//                {
+//                    _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
+//                }
+
+            }
+        }
+        catch (DatabaseException e)
+        {
+            _log.error("Database Error: " + e, e);
+            throw e;
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+
+//        if (_log.isInfoEnabled())
+//        {
+//            _log.info("Recovered message counts: " + _queueRecoveries);
+//        }
+//
+//        for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
+//        {
+//            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
+//
+//            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
+//        }
+    }
+
     /**
-     * Removes the specified message from the store in the given transactional store context.
+     * Removes the specified message from the store.
      *
-     * @param context   The transactional context to remove the message in.
      * @param messageId Identifies the message to remove.
      *
      * @throws AMQException If the operation fails for any reason.
+     * @throws DatabaseException 
      */
-    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    public void removeMessage(Long messageId) throws AMQException
     {
         // _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
         // + "): called");
 
-        boolean localTx = getOrCreateTransaction(context);
-        Transaction tx = (Transaction) context.getPayload();
+        com.sleepycat.je.Transaction tx = null;
+        
+        Cursor cursor = null;
+        try
+        {
+            tx = _environment.beginTransaction(null, null);
+            
+            //remove the message meta data from the store
+            DatabaseEntry key = new DatabaseEntry();
+            EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
+            metaKeyBindingTuple.objectToEntry(messageId, key);
 
-        DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
-        keyBinding.objectToEntry(messageId, key);
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Message Id: " + messageId + " Removing");
+            }
 
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Message Id: " + messageId + " Removing");
-        }
-
-        // first we need to look up the header to get the chunk count
-        MessageMetaData mmd = getMessageMetaData(context, messageId);
-        try
-        {
+            
             OperationStatus status = _messageMetaDataDb.delete(tx, key);
             if (status == OperationStatus.NOTFOUND)
             {
-                if (localTx)
-                {
-                    tx.abort();
-                    context.setPayload(null);
-                }
+                tx.abort();
 
                 throw new AMQException("Message metadata not found for message id " + messageId);
             }
@@ -529,56 +790,73 @@
                 _log.debug("Deleted metadata for message " + messageId);
             }
 
-            DatabaseEntry contentKey = new DatabaseEntry();
-            TupleBinding contentKeyBinding = new MessageContentKey.TupleBinding();
-            for (int i = 0; i < mmd.getContentChunkCount(); i++)
+            //now remove the content data from the store if there is any.
+
+            DatabaseEntry contentKeyEntry = new DatabaseEntry();
+            MessageContentKey_3 mck = new MessageContentKey_3(messageId,0);
+
+            TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+            contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+
+            //Use a partial record for the value to prevent retrieving the 
+            //data itself as we only need the key to identify what to remove.
+            DatabaseEntry value = new DatabaseEntry();
+            value.setPartial(0, 0, true);
+
+            cursor = _messageContentDb.openCursor(tx, null);
+
+            status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+            while (status == OperationStatus.SUCCESS)
             {
-                MessageContentKey mck = new MessageContentKey(messageId, i);
-                contentKeyBinding.objectToEntry(mck, contentKey);
-                status = _messageContentDb.get(tx, contentKey, new DatabaseEntry(), LockMode.RMW);
-                if (status == OperationStatus.NOTFOUND)
+                mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+                
+                if(mck.getMessageId() != messageId)
                 {
-                    if (localTx)
-                    {
-                        tx.abort();
-                        context.setPayload(null);
-                    }
-
-                    throw new AMQException("Content chunk " + i + " not found for message " + messageId);
+                    //we have exhausted all chunks for this message id, break
+                    break;
                 }
-
-                status = _messageContentDb.delete(tx, contentKey);
-                if (status == OperationStatus.NOTFOUND)
+                else
                 {
-                    if (localTx)
+                    status = cursor.delete();
+                    
+                    if(status == OperationStatus.NOTFOUND)
                     {
+                        cursor.close();
+                        cursor = null;
+                        
                         tx.abort();
-                        context.setPayload(null);
+                        throw new AMQException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
                     }
-
-                    throw new AMQException("Content chunk " + i + " not found for message " + messageId);
+                    
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+                    }
                 }
-
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Deleted content chunk " + i + " for message " + messageId);
-                }
+                
+                status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
             }
 
-            if (localTx)
-            {
-                commit(tx);
-                context.setPayload(null);
-            }
+            cursor.close();
+            cursor = null;
+            
+            commit(tx);
         }
         catch (DatabaseException e)
         {
-            if ((tx != null) && localTx)
+            e.printStackTrace();
+            
+            if (tx != null)
             {
                 try
                 {
+                    if(cursor != null)
+                    {
+                        cursor.close();
+                        cursor = null;
+                    }
+                    
                     tx.abort();
-                    context.setPayload(null);
                 }
                 catch (DatabaseException e1)
                 {
@@ -586,8 +864,23 @@
                 }
             }
 
-            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+            throw new AMQException("Error removing message with id " + messageId + " from database: " + e, e);
         }
+        finally
+        {
+            if(cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch (DatabaseException e)
+                {
+                    //TODO
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
     /**
@@ -601,12 +894,17 @@
     {
         if (_state != State.RECOVERING)
         {
+            ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(), 
+                                             exchange.getTypeShortString(), exchange.isAutoDelete());
+
             DatabaseEntry key = new DatabaseEntry();
             EntryBinding keyBinding = new AMQShortStringTB();
-            keyBinding.objectToEntry(exchange.getName(), key);
+            keyBinding.objectToEntry(exchange.getNameShortString(), key);
+
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
-            exchangeBinding.objectToEntry(exchange, value);
+            TupleBinding exchangeBinding = new ExchangeTB();
+            exchangeBinding.objectToEntry(exchangeRec, value);
+
             try
             {
                 _exchangeDb.put(null, key, value);
@@ -630,7 +928,7 @@
     {
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new AMQShortStringTB();
-        keyBinding.objectToEntry(exchange.getName(), key);
+        keyBinding.objectToEntry(exchange.getNameShortString(), key);
         try
         {
             OperationStatus status = _exchangeDb.delete(null, key);
@@ -645,117 +943,9 @@
         }
     }
 
-    private void recoverExchanges() throws AMQException, DatabaseException
-    {
-        for (Exchange exchange : loadExchanges())
-        {
-            recoverExchange(exchange);
-        }
-    }
 
-    private void recoverExchange(Exchange exchange) throws AMQException, DatabaseException
-    {
-        _log.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
 
-        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
-        for (BindingKey binding : loadQueueBindings(exchange))
-        {
-            AMQQueue queue = queueRegistry.getQueue(binding.getQueueName());
-            if (queue == null)
-            {
-                _log.error("Unknown queue: " + binding.getQueueName() + " cannot be bound to exchange: "
-                           + exchange.getName());
-            }
-            else
-            {
-                _log.info("Restoring binding: (Exchange: " + binding.getExchangeName() + ", Queue: " + binding
-                        .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
-                                        + ")");
-
-                queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
-            }
-        }
-    }
-
-    private List<BindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
-    {
-
-        Cursor cursor = null;
-        List<BindingKey> queueBindings = new ArrayList<BindingKey>();
-        try
-        {
-            cursor = _queueBindingsDb.openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-
-            BindingKey queueBinding =
-                    new BindingKey(exchange.getName(), null, null, null);
-
-            EntryBinding<BindingKey> keyBinding = _bindingTupleBindingFactory.getInstance();
-            keyBinding.objectToEntry(queueBinding, key);
-
-            OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
-
-            TupleBinding binding = _bindingTupleBindingFactory.getInstance();
-            while (opStatus == OperationStatus.SUCCESS)
-            {
-                queueBinding = (BindingKey) binding.entryToObject(key);
-                if (queueBinding.getExchangeName().equals(exchange.getName()))
-                {
-                    queueBindings.add(queueBinding);
-                    opStatus = cursor.getNext(key, value, LockMode.RMW);
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            return queueBindings;
-        }
-        finally
-        {
-            if (cursor != null)
-            {
-                cursor.close();
-            }
-        }
-
-    }
-
-    private List<Exchange> loadExchanges() throws AMQException, DatabaseException
-    {
-        Cursor cursor = null;
-        List<Exchange> exchanges = new ArrayList<Exchange>();
-        try
-        {
-            cursor = _exchangeDb.openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            TupleBinding binding = new ExchangeTB(_virtualHost);
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                Exchange exchange = (Exchange) binding.entryToObject(value);
-
-                _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
-                exchanges.add(exchange);
-                _log.info("Registering exchange " + exchange.getName());
-            }
-
-            return exchanges;
-        }
-        finally
-        {
-            if (cursor != null)
-            {
-                cursor.close();
-            }
-        }
-
-    }
-
     /**
      * Binds the specified queue to an exchange with a routing key.
      *
@@ -773,13 +963,20 @@
 
         if (_state != State.RECOVERING)
         {
+            BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), 
+                                                queue.getNameShortString(), routingKey, args);
+
             DatabaseEntry key = new DatabaseEntry();
             EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
-            keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+            
+            keyBinding.objectToEntry(bindingRecord, key);
 
+            //yes, this is writing out 0 as a value and putting all the 
+            //useful info into the key, don't ask me why. For table
+            //compatibility it shall currently be left as is
             DatabaseEntry value = new DatabaseEntry();
             ByteBinding.byteToEntry((byte) 0, value);
-
+            
             try
             {
                 _queueBindingsDb.put(null, key, value);
@@ -807,7 +1004,7 @@
     {
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
-        keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+        keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
 
         try
         {
@@ -844,20 +1041,17 @@
 
         if (_state != State.RECOVERING)
         {
-            long queueId = _queueId.getAndIncrement();
-            _queueNameToIdMap.put(queue.getName(), queueId);
-
+            QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), 
+                                                     queue.getOwner(), arguments);
+            
             DatabaseEntry key = new DatabaseEntry();
-
             EntryBinding keyBinding = new AMQShortStringTB();
-            keyBinding.objectToEntry(queue.getName(), key);
+            keyBinding.objectToEntry(queue.getNameShortString(), key);
 
             DatabaseEntry value = new DatabaseEntry();
             TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
 
-            ((QueueTuple) queueBinding).setArguments(arguments);
-
-            queueBinding.objectToEntry(queue, value);
+            queueBinding.objectToEntry(queueRecord, value);
             try
             {
                 _queueDb.put(null, key, value);
@@ -878,12 +1072,10 @@
      */
     public void removeQueue(final AMQQueue queue) throws AMQException
     {
-        AMQShortString name = queue.getName();
+        AMQShortString name = queue.getNameShortString();
 
         _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
 
-        Long queueId = _queueNameToIdMap.remove(name);
-
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new AMQShortStringTB();
         keyBinding.objectToEntry(name, key);
@@ -902,34 +1094,6 @@
     }
 
     /**
-     * Really for testing purposes.
-     *
-     * @param name
-     *
-     * @return
-     *
-     * @throws AMQException
-     */
-    AMQQueue getQueue(AMQShortString name) throws AMQException
-    {
-        DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTB();
-        keyBinding.objectToEntry(name, key);
-        DatabaseEntry value = new DatabaseEntry();
-        try
-        {
-            _queueDb.get(null, key, value, LockMode.RMW);
-            TupleBinding binding = _queueTupleBindingFactory.getInstance();
-
-            return (AMQQueue) binding.entryToObject(value);
-        }
-        catch (DatabaseException e)
-        {
-            throw new AMQException("Error getting queue with name " + name + " from database: " + e, e);
-        }
-    }
-
-    /**
      * Places a message onto a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
@@ -938,15 +1102,16 @@
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
     {
         // _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
         // + ", Long messageId): called");
 
-        AMQShortString name = queue.getName();
-        Transaction tx = (Transaction) context.getPayload();
+        AMQShortString name = new AMQShortString(queue.getResourceName());
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+        
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        EntryBinding keyBinding = new QueueEntryTB();
         QueueEntryKey dd = new QueueEntryKey(name, messageId);
         keyBinding.objectToEntry(dd, key);
         DatabaseEntry value = new DatabaseEntry();
@@ -968,28 +1133,6 @@
         }
     }
 
-    private boolean getOrCreateTransaction(StoreContext context) throws AMQException
-    {
-
-        Transaction tx = (Transaction) context.getPayload();
-        if (tx == null)
-        {
-            try
-            {
-                tx = _environment.beginTransaction(null, null);
-                context.setPayload(tx);
-
-                return true;
-            }
-            catch (DatabaseException e)
-            {
-                throw new AMQException("Error beginning transaction: " + e, e);
-            }
-        }
-
-        return false;
-    }
-
     /**
      * Extracts a message from a specified queue, in a given transactional context.
      *
@@ -999,14 +1142,14 @@
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
     {
-        AMQShortString name = queue.getName();
-        boolean isLocal = getOrCreateTransaction(context);
-        Transaction tx = (Transaction) context.getPayload();
+        AMQShortString name = new AMQShortString(queue.getResourceName());
 
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        EntryBinding keyBinding = new QueueEntryTB();
         QueueEntryKey dd = new QueueEntryKey(name, messageId);
 
         keyBinding.objectToEntry(dd, key);
@@ -1034,82 +1177,18 @@
                 _log.debug("Removed message " + messageId + ", " + name + " from delivery db");
 
             }
-
-            if (isLocal)
-            {
-                commit(tx);
-                context.setPayload(null);
-            }
         }
         catch (DatabaseException e)
         {
 
             _log.error("Failed to dequeue message " + messageId + ": " + e, e);
             _log.error(tx);
-            if (isLocal)
-            {
-                try
-                {
-                    tx.abort();
-                    context.setPayload(null);
-                }
-                catch (DatabaseException e1)
-                {
-                    throw new AMQException("Error rolling back transaction: " + e1, e1);
-                }
-            }
 
             throw new AMQException("Error accessing database while dequeuing message: " + e, e);
         }
     }
 
-    private boolean isLocalTransaction(StoreContext context)
-    {
-        return context.getPayload() == null;
-    }
-
     /**
-     * Begins a transactional context.
-     *
-     * @param context The transactional context to begin.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    public void beginTran(StoreContext context) throws AMQException
-    {
-        // _log.debug("public void beginTran(StoreContext context = " + context + "): called");
-
-        if (context.getPayload() != null)
-        {
-            throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
-                                   + context.getPayload());
-        }
-        else
-        {
-            try
-            {
-                context.setPayload(_environment.beginTransaction(null, _transactionConfig ));
-            }
-            catch (DatabaseException e)
-            {
-                throw new AMQException("Error starting transaction: " + e, e);
-            }
-        }
-    }
-
-    /**
-     * Tests a transactional context to see if it has been begun but not yet committed or aborted.
-     *
-     * @param context The transactional context to test.
-     *
-     * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
-     */
-    public boolean inTran(StoreContext context)
-    {
-        return context.getPayload() != null;
-    }
-
-    /**
      * Commits all operations performed within a given transactional context.
      *
      * @param context The transactional context to commit all operations for.
@@ -1120,7 +1199,7 @@
     {
         // _log.debug("public void commitTran(StoreContext context = " + context + "): called");
 
-        Transaction tx = (Transaction) context.getPayload();
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
 
         if (tx == null)
         {
@@ -1146,6 +1225,13 @@
         }
     }
 
+    public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+    {
+        //TODO - Actually create an async commit implementation.
+        commitTran(context);
+        return IMMEDIATE_FUTURE;
+    }
+
     /**
      * Abandons all operations performed within a given transactional context.
      *
@@ -1155,7 +1241,7 @@
      */
     public void abortTran(StoreContext context) throws AMQException
     {
-        Transaction tx = (Transaction) context.getPayload();
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
 
         if (_log.isDebugEnabled())
         {
@@ -1194,7 +1280,7 @@
 
             QueueEntryKey dd = new QueueEntryKey(queueName, 0);
 
-            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+            EntryBinding keyBinding = new QueueEntryTB();
             keyBinding.objectToEntry(dd, key);
 
             DatabaseEntry value = new DatabaseEntry();
@@ -1204,10 +1290,10 @@
             OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
             dd = (QueueEntryKey) keyBinding.entryToObject(key);
 
-            while ((status == OperationStatus.SUCCESS) && dd.queueName.equals(queueName))
+            while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
             {
 
-                messageIds.add(dd.messageId);
+                messageIds.add(dd.getMessageId());
                 status = cursor.getNext(key, value, LockMode.DEFAULT);
                 if (status == OperationStatus.SUCCESS)
                 {
@@ -1237,59 +1323,6 @@
         }
     }
 
-    public void recover(VirtualHost virtualHost) throws AMQException
-    {
-        stateTransition(State.CONFIGURED, State.RECOVERING);
-
-        _log.info("Recovering persistent state...");
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(null, false));
-
-        StoreContext context = new StoreContext();
-        try
-        {
-            beginTran(context);
-            Map<AMQShortString, AMQQueue> queues = loadQueues();
-
-            for (AMQQueue q : queues.values())
-            {
-                CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
-                //Record that we have a queue for recovery
-                _queueRecoveries.put(q.getName(), 0);
-
-                q.configure(virtualHost.getConfiguration().getQueueConfiguration(q.getName().asString()));
-
-            }
-
-            recoverExchanges();
-
-            deliverMessages(context, queues);
-            _log.info("Persistent state recovered successfully");
-            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(null, false));
-
-            commitTran(context);
-
-        }
-        catch (DatabaseException e)
-        {
-            abortTran(context);
-
-            throw new AMQException("Error recovering persistent state: " + e, e);
-        }
-        catch (AMQException amqe)
-        {
-            abortTran(context);
-            throw new AMQException("Error recovering persistent state: " + amqe, amqe);
-        }
-        catch (Throwable ioobe)
-        {
-            abortTran(context);
-            throw new AMQException("Invalid database format. Please use upgrade tool for store in Virtualhost:'"
-                                   + _virtualHost.getName() + "'", ioobe);
-        }
-
-        stateTransition(State.RECOVERING, State.STARTED);
-    }
-
     /**
      * Return a valid, currently unused message id.
      *
@@ -1297,7 +1330,7 @@
      */
     public Long getNewMessageId()
     {
-        return _messageId.getAndIncrement();
+        return _messageId.incrementAndGet();
     }
 
     /**
@@ -1305,20 +1338,21 @@
      *
      * @param context         The transactional context for the operation.
      * @param messageId       The message to store the data for.
-     * @param index           The index of the data chunk.
+     * @param offset          The offset of the data chunk in the message.
      * @param contentBody     The content of the data chunk.
      * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
-                                      boolean lastContentBody) throws AMQException
+    protected void addContent(StoreContext context, Long messageId, int offset, 
+                                      ByteBuffer contentBody) throws AMQException
     {
 
-        Transaction tx = (Transaction) context.getPayload();
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+        
         DatabaseEntry key = new DatabaseEntry();
-        TupleBinding keyBinding = new MessageContentKey.TupleBinding();
-        keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+        TupleBinding keyBinding = new MessageContentKeyTB_3();
+        keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
         DatabaseEntry value = new DatabaseEntry();
         TupleBinding messageBinding = new ContentTB();
         messageBinding.objectToEntry(contentBody, value);
@@ -1327,13 +1361,13 @@
             OperationStatus status = _messageContentDb.put(tx, key, value);
             if (status != OperationStatus.SUCCESS)
             {
-                throw new AMQException("Error adding content chunk " + index + " for message id " + messageId + ": "
+                throw new AMQException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
                                        + status);
             }
 
             if (_log.isDebugEnabled())
             {
-                _log.debug("Storing content chunk " + index + " for message " + messageId + "[Transaction" + tx + "]");
+                _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
             }
         }
         catch (DatabaseException e)
@@ -1351,7 +1385,7 @@
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
+    private void storeMetaData(StoreContext context, Long messageId, StorableMessageMetaData messageMetaData)
             throws AMQException
     {
         if (_log.isDebugEnabled())
@@ -1359,14 +1393,15 @@
             _log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
                        + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
         }
-        //This call breaking tests - not sure where the txn it creates should be committed ??
-        //getOrCreateTransaction(context);
-        Transaction tx = (Transaction) context.getPayload();
+
+        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+        
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTB();
+        
+        TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
         messageBinding.objectToEntry(messageMetaData, value);
         try
         {
@@ -1392,11 +1427,11 @@
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQException
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("public MessageMetaData getMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+            _log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
                        + messageId + "): called");
         }
 
@@ -1404,7 +1439,7 @@
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTB();
+        TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
 
         try
         {
@@ -1414,7 +1449,7 @@
                 throw new AMQException("Metadata not found for message with id " + messageId);
             }
 
-            MessageMetaData mdd = (MessageMetaData) messageBinding.entryToObject(value);
+            StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
 
             return mdd;
         }
@@ -1426,47 +1461,110 @@
     }
 
     /**
-     * Retrieves a chunk of message data.
+     * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+     * from the specified offset in the message.
      *
-     * @param context   The transactional context for the operation.
-     * @param messageId The message to get the data chunk for.
-     * @param index     The offset index of the data chunk within the message.
+     * @param messageId The message to get the data for.
+     * @param offset    The offset of the data within the message.
+     * @param dst       The destination of the content read back
      *
-     * @return A chunk of message data.
+     * @return The number of bytes inserted into the destination
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQException
     {
-
-        DatabaseEntry key = new DatabaseEntry();
-        TupleBinding keyBinding = new MessageContentKey.TupleBinding();
-        keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+        final int limit = offset + dst.remaining();
+        
+        DatabaseEntry contentKeyEntry = new DatabaseEntry();
+        
+        //TODO: if requested offset is non zero, use partial record (key-only) search to 
+        //locate the first record key to prevent reading in data we dont need
+        
+        //Start from 0 offset and search for the starting chunk. 
+        MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
+        TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+        contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new ContentTB();
+        TupleBinding contentTupleBinding = new ContentTB();
+        
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message Id: " + messageId + " Getting content body chunk: " + index);
+            _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
         }
 
+        int written = 0;
+        int seenSoFar = 0;
+        
+        Cursor cursor = null;
         try
         {
-            OperationStatus status = _messageContentDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
-            if (status != OperationStatus.SUCCESS)
+            cursor = _messageContentDb.openCursor(null, null);
+            
+            OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+            while (status == OperationStatus.SUCCESS)
             {
+                mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+                long id = mck.getMessageId();
+                
+                if(id != messageId)
+                {
+                    //we have exhausted all chunks for this message id, break
+                    break;
+                }
+                
+                int offsetInMessage = mck.getOffset();
+                ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
+                
+                final int size = (int) buf.limit();
+                
+                seenSoFar += size;
+                
+                //TODO: can change this guard if we start recording the last byte in the chunk record
+                if(seenSoFar >= offset)
+                {
+                    byte[] dataAsBytes = buf.array();
 
-                throw new AMQException("Content chunk " + index + " not found for message with id " + messageId);
+                    int posInArray = offset + written - offsetInMessage;
+                    int count = size - posInArray;
+                    if(count > dst.remaining())
+                    {
+                        count = dst.remaining();
+                    }
+                    dst.put(dataAsBytes,posInArray,count);
+                    written+=count;
+
+                    if(dst.remaining() == 0)
+                    {
+                        break;
+                    }
+                }
+                
+                status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
             }
 
-            ContentChunk cb = (ContentChunk) messageBinding.entryToObject(value);
-
-            return cb;
+            return written;
         }
         catch (DatabaseException e)
         {
 
             throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
         }
+        finally
+        {
+            if(cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch (DatabaseException e)
+                {
+                    // TODO
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
     public boolean isPersistent()
@@ -1474,47 +1572,27 @@
         return true;
     }
 
-    Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException, AMQException
+    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
     {
-        Cursor cursor = null;
-        Map<AMQShortString, AMQQueue> queues = new HashMap<AMQShortString, AMQQueue>();
-        try
+        if(metaData.isPersistent())
         {
-            cursor = _queueDb.openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            TupleBinding binding = _queueTupleBindingFactory.getInstance();
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                AMQQueue queue = (AMQQueue) binding.entryToObject(value);
-                if (queue != null)
-                {
-                    _virtualHost.getQueueRegistry().registerQueue(queue);
-                    queues.put(queue.getName(), queue);
-                    _log.info("Recovering queue " + queue.getName() + " with owner:"
-                              + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
-                }
-            }
-
-            return queues;
+            return new StoredBDBMessage(getNewMessageId(), metaData);
         }
-        finally
+        else
         {
-            if (cursor != null)
-            {
-                cursor.close();
-            }
+            return new StoredMemoryMessage(getNewMessageId(), metaData);
         }
     }
 
-    //public getters for the TupleBindingFactories
 
-    public QueueTupleBindingFactory getQueueTupleBindingFactory()
+    //protected getters for the TupleBindingFactories
+
+    protected QueueTupleBindingFactory getQueueTupleBindingFactory()
     {
         return _queueTupleBindingFactory;
     }
 
-    public BindingTupleBindingFactory getBindingTupleBindingFactory()
+    protected BindingTupleBindingFactory getBindingTupleBindingFactory()
     {
         return _bindingTupleBindingFactory;
     }
@@ -1612,162 +1690,8 @@
         }
     }
 
-    private static final class ProcessAction
+    private void commit(com.sleepycat.je.Transaction tx) throws DatabaseException
     {
-        private final AMQQueue _queue;
-        private final StoreContext _context;
-        private final AMQMessage _message;
-
-        public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
-        {
-            _queue = queue;
-            _context = context;
-            _message = message;
-        }
-
-        public void process() throws AMQException
-        {
-            _queue.enqueue(_context, _message);
-        }
-
-    }
-
-    private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
-            throws DatabaseException, AMQException
-    {
-        Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
-        List<ProcessAction> actions = new ArrayList<ProcessAction>();
-
-
-        Cursor cursor = null;
-        try
-        {
-            Transaction tx = (Transaction) context.getPayload();
-            cursor = _deliveryDb.openCursor(tx, null);
-            DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
-
-            DatabaseEntry value = new DatabaseEntry();
-            EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
-            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
-            long maxId = 1;
-
-            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-
-                QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
-
-                AMQShortString queueName = dd.queueName;
-
-                AMQQueue queue = queues.get(queueName);
-                
-                // If the matching queue was not already found in the store, check in case a queue
-                // with the same name exists in the virtualhost, otherwise we will create a duplicate
-                // queue and generate a JMX InstanceAlreadyExistsException, preventing startup.
-                if (queue == null)
-                {
-                    queue = _virtualHost.getQueueRegistry().getQueue(queueName);
-                    if (queue != null)
-                    {
-                        queues.put(queueName, queue);
-                    }
-                }
-                
-                if (queue == null)
-                {
-                    queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
-                    _virtualHost.getQueueRegistry().registerQueue(queue);
-                    queues.put(queueName, queue);
-                }
-
-                long messageId = dd.messageId;
-                maxId = Math.max(maxId, messageId);
-                AMQMessage message = msgMap.get(messageId);
-
-                if (message != null)
-                {
-                    message.incrementReference();
-                }
-                else
-                {
-                    message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
-                    msgMap.put(messageId, message);
-                }
-
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
-                }
-
-                Integer count = _queueRecoveries.get(queueName);
-                if (count == null)
-                {
-                    count = 0;
-                }
-
-                _queueRecoveries.put(queueName, ++count);
-
-                actions.add(new ProcessAction(queue, context, message));
-
-            }
-
-            for (ProcessAction action : actions)
-            {
-                action.process();
-            }
-
-            _messageId.set(maxId + 1);
-        }
-        catch (DatabaseException e)
-        {
-            _log.error("Database Error: " + e, e);
-            throw e;
-        }
-        catch (AMQException e)
-        {
-            _log.error("Store Error: " + e, e);
-            throw e;
-        }
-        finally
-        {
-            if (cursor != null)
-            {
-                cursor.close();
-            }
-        }
-
-        if (_log.isInfoEnabled())
-        {
-            _log.info("Recovered message counts: " + _queueRecoveries);
-        }
-
-        for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
-        {
-            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
-
-            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
-        }
-
-        //Free the memory
-        _queueRecoveries = null;
-    }
-
-    QueueRegistry getQueueRegistry()
-    {
-        return _virtualHost.getQueueRegistry();
-    }
-
-    void setVirtualHost(VirtualHost virtualHost)
-    {
-        _virtualHost = virtualHost;
-
-        createTupleBindingFactories(_version);
-    }
-
-    void commit(Transaction tx) throws DatabaseException
-    {
         // _log.debug("void commit(Transaction tx = " + tx + "): called");
 
         tx.commitNoSync();
@@ -1787,11 +1711,11 @@
         // private static final Logger _log = Logger.getLogger(Commit.class);
 
         private final CommitThread _commitThread;
-        private final Transaction _tx;
+        private final com.sleepycat.je.Transaction _tx;
         private DatabaseException _databaseException;
         private boolean _complete;
 
-        public Commit(CommitThread commitThread, Transaction tx)
+        public Commit(CommitThread commitThread, com.sleepycat.je.Transaction tx)
         {
             // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
             // + "): called");
@@ -1973,5 +1897,184 @@
             }
         }
     }
+    
+    
+    private class StoredBDBMessage implements StoredMessage
+    {
 
+        private final long _messageId;
+        private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+        private StoreContext _ctx;
+        private com.sleepycat.je.Transaction _txn;
+
+        StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+        {
+            this(messageId, metaData, true);
+        }
+
+
+        StoredBDBMessage(long messageId,
+                           StorableMessageMetaData metaData, boolean persist)
+        {
+            try
+            {
+                _messageId = messageId;
+
+                _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+                if(persist)
+                {
+                    _ctx = new StoreContext();
+                    _txn = _environment.beginTransaction(null, null);
+                    _ctx.setPayload(_txn);
+                    storeMetaData(_ctx, messageId, metaData);
+                }
+            }
+            catch (DatabaseException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+            catch (AMQException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+
+        }
+
+        public StorableMessageMetaData getMetaData()
+        {
+            StorableMessageMetaData metaData = _metaDataRef.get();
+            if(metaData == null)
+            {
+                try
+                {
+                    metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+                }
+                catch (AMQException e)
+                {
+                    //TODO
+                    throw new RuntimeException(e);
+                }
+                _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+            }
+
+            return metaData;
+        }
+
+        public long getMessageNumber()
+        {
+            return _messageId;
+        }
+
+        public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+        {
+            try
+            {
+                BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+            }
+            catch (AMQException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+        }
+
+        public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+        {
+            try
+            {
+                return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+            }
+            catch (AMQException e)
+            {
+                // TODO
+                throw new RuntimeException(e);
+            }
+        }
+
+        public StoreFuture flushToStore()
+        {
+            try
+            {
+                if(_ctx != null)
+                {
+                    BDBMessageStore.this.commitTran(_ctx);
+                }
+            }
+            catch (AMQException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+            finally
+            {
+                _txn = null;
+                _ctx = null;
+            }
+            return IMMEDIATE_FUTURE;
+        }
+
+        public void remove()
+        {
+            flushToStore();
+            try
+            {
+                BDBMessageStore.this.removeMessage(_messageId);
+            }
+            catch (AMQException e)
+            {
+                // TODO
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private class BDBTransaction implements Transaction
+    {
+        private com.sleepycat.je.Transaction _txn;
+        private StoreContext _ctx;
+
+        private BDBTransaction()
+        {
+            _ctx = new StoreContext();
+            try
+            {
+                _txn = _environment.beginTransaction(null, null);
+            }
+            catch (DatabaseException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+            _ctx.setPayload(_txn);
+        }
+
+        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+        {
+            BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+        }
+
+        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+        {
+            BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+
+        }
+
+        public void commitTran() throws AMQException
+        {
+            BDBMessageStore.this.commitTran(_ctx);
+        }
+
+        public StoreFuture commitTranAsync() throws AMQException
+        {
+            return BDBMessageStore.this.commitTranAsync(_ctx);
+        }
+
+        public void abortTran() throws AMQException
+        {
+            BDBMessageStore.this.abortTran(_ctx);
+        }
+    }
+
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,14 +20,12 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import java.nio.ByteBuffer;
+
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 
-import org.apache.mina.common.ByteBuffer;
-
 /**
  * @author Robert Greig (robert.j.greig at jpmorgan.com)
  */
@@ -39,39 +37,19 @@
         final int size = tupleInput.readInt();
         byte[] underlying = new byte[size];
         tupleInput.readFast(underlying);
-        final ByteBuffer data  = ByteBuffer.wrap(underlying);
-        ContentChunk cb = new ContentChunk()
-        {
-
-            public int getSize()
-            {
-                return size;
-            }
-
-            public ByteBuffer getData()
-            {
-                return data;
-            }
-
-            public void reduceToFit()
-            {
-
-            }
-        };
-        return cb;
+        return ByteBuffer.wrap(underlying);
     }
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)
     {
-        ContentChunk cb = (ContentChunk) object;
-        final int size = cb.getSize();
-        byte[] underlying = new byte[size];
+        ByteBuffer src = (ByteBuffer) object;
+        
+        src = src.slice();
 
-        ByteBuffer buf = cb.getData();
+        byte[] chunkData = new byte[src.limit()];
+        src.duplicate().get(chunkData);
 
-        buf.duplicate().rewind().get(underlying);
-
-        tupleOutput.writeInt(size);
-        tupleOutput.writeFast(underlying);
+        tupleOutput.writeInt(chunkData.length);
+        tupleOutput.writeFast(chunkData);
     }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -4,20 +4,15 @@
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
 
 public class ExchangeTB extends TupleBinding
 {
     private static final Logger _log = Logger.getLogger(ExchangeTB.class);
 
-    private final VirtualHost _virtualHost;
-
-    public ExchangeTB(VirtualHost virtualHost)
+    public ExchangeTB()
     {
-        _virtualHost = virtualHost;
     }
 
     public Object entryToObject(TupleInput tupleInput)
@@ -27,43 +22,17 @@
         AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
 
         boolean autoDelete = tupleInput.readBoolean();
-
-        try
-        {
-            Exchange exchange;
-            Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
-
-            if (existing != null)
-            {
-                _log.info("Exchange :" + existing + ": already exists in configured broker.");
-                exchange = existing;
-            }
-            else
-            {
-                exchange = _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
-
-                _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
-                _log.info("Registering new durable exchange from BDB:" + exchange);
-            }
-
-            return exchange;
-        }
-        catch (AMQException e)
-        {
-            _log.error("Unable to create exchange: " + e, e);
-            return null;
-        }
+        
+        return new ExchangeRecord(name, typeName, autoDelete);
     }
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)
     {
-        Exchange exchange = (Exchange) object;
+        ExchangeRecord exchange = (ExchangeRecord) object;
 
-        AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
         AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
 
         tupleOutput.writeBoolean(exchange.isAutoDelete());
-
     }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,223 +1,42 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import java.util.Comparator;
-import java.io.Serializable;
-
-/**
- * @author Apache Software Foundation
- */
 public class MessageContentKey
 {
-    public long messageId;
-    public int chunk;
-
-    public MessageContentKey()
+    private long _messageId;
+    
+    public MessageContentKey(long messageId)
     {
-    }
-
-    public MessageContentKey(byte[] payload)
+        _messageId = messageId;
+    }    
+    
+    
+    public long getMessageId()
     {
-        final TupleInput ti = new TupleInput(payload);
-        messageId = ti.readLong();
-        chunk = ti.readInt();
+        return _messageId;
     }
 
-    public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
+    public void setMessageId(long messageId)
     {
-        public Object entryToObject(TupleInput tupleInput)
-        {
-            final MessageContentKey mk = new MessageContentKey();
-            mk.messageId = tupleInput.readLong();
-            mk.chunk = tupleInput.readInt();
-            return mk;
-        }
-
-        public void objectToEntry(Object object, TupleOutput tupleOutput)
-        {
-            final MessageContentKey mk = (MessageContentKey) object;
-            tupleOutput.writeLong(mk.messageId);            
-            tupleOutput.writeInt(mk.chunk);
-        }
-
-
+        this._messageId = messageId;
     }
-
-    public static void writeModifiedLong(TupleOutput t, long l)
-    {
-        int ln = (int) (0x0F & l);
-        int hn = (int) (0xF0 & l);
-        t.writeByte(((ln <<4) | (hn>>4)) ^ 0x55);
-        t.writeByte((int) (0xFF & (l >> 8)));
-        t.writeByte((int) (0xFF & (l >> 16)));
-        t.writeByte((int) (0xFF & (l >> 24)));
-        t.writeByte((int) (0xFF & (l >> 32)));
-        t.writeByte((int) (0xFF & (l >> 40)));
-        t.writeByte((int) (0xFF & (l >> 48)));
-        t.writeByte((int) (0xFF & (l >> 56)));
-
-    }
-
-    public static long readModifiedLong(TupleInput t)
-    {
-        byte b0 = (byte) (0xFF & (t.readByte() ^ 0x55));
-        long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
-
-        l |=  ((long) (0xFF & t.readByte())) << 8;
-        l |=  ((long) (0xFF & t.readByte())) << 16;
-        l |=  ((long) (0xFF & t.readByte())) << 24;
-        l |=  ((long) (0xFF & t.readByte())) << 32;
-        l |=  ((long) (0xFF & t.readByte())) << 40;
-        l |=  ((long) (0xFF & t.readByte())) << 48;
-        l |=  ((long) (0xFF & t.readByte())) << 56;
-
-        return l;
-    }
-
-
-    public static class ContentKeyComparator implements Comparator, Serializable
-    {
-
-        public int compare(Object o1, Object o2)
-        {
-            byte[] b1 = (byte[]) o1;
-            byte[] b2 = (byte[]) o2;
-
-            MessageContentKey ck1 = new MessageContentKey(b1);
-            MessageContentKey ck2 = new MessageContentKey(b2);
-            if (ck1.messageId == ck2.messageId)
-            {
-                // reminder of Comparator return value:
-                // return a negative integer if the first item is "less" than the second, 0 if equal
-                return ck1.chunk - ck2.chunk;
-            }
-            else
-            {
-                return (int) (ck1.messageId - ck2.messageId);
-            }
-        }
-    }
-
-    public MessageContentKey(long messageId, int chunk)
-    {
-        this.chunk = chunk;
-        this.messageId = messageId;
-    }
-
-
-
-    private static final char[] HEX = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-    private static StringBuilder appendHex(StringBuilder str, byte b)
-    {
-        str.append(HEX[0xF & (b >> 4)]);
-        str.append(HEX[0xF & b]);
-        return str;
-    }
-
-    private static StringBuilder appendHex(StringBuilder str, byte[] b)
-    {
-        for(int i = 0; i < b.length; i++)
-        {
-            appendHex(str,b[i]);
-        }
-        return str;
-    }
-
-    private static StringBuilder appendHex(StringBuilder str, int i)
-    {
-        appendHex(str,(byte)(0xFF & (i >> 24)));
-        appendHex(str,(byte)(0xFF & (i >> 16)));
-        appendHex(str,(byte)(0xFF & (i >> 8)));
-        appendHex(str,(byte)(0xFF & i));
-        return str;
-    }
-
-
-    private static StringBuilder appendHex(StringBuilder str, long l)
-    {
-        appendHex(str,(byte)(0xFF & (l >> 56)));
-        appendHex(str,(byte)(0xFF & (l >> 48)));
-        appendHex(str,(byte)(0xFF & (l >> 40)));
-        appendHex(str,(byte)(0xFF & (l >> 32)));
-        appendHex(str,(byte)(0xFF & (l >> 24)));
-        appendHex(str,(byte)(0xFF & (l >> 16)));
-        appendHex(str,(byte)(0xFF & (l >> 8)));
-        appendHex(str,(byte)(0xFF & l));
-        return str;
-    }
-
-
-
-
-    private static byte[] convertLong(long l)
-    {
-        byte[] b = new byte[8];
-        int ln = (int) (0x0F & l);
-        int hn = (int) (0xF0 & l);
-        b[0] = (byte)(((ln <<4) | (hn>>4)) ^ 0x55);
-        b[1] = ((byte)(0xFF & (l >> 8)));
-        b[2] = ((byte)(0xFF & (l >> 16)));
-        b[3] = ((byte)(0xFF & (l >> 24)));
-        b[4] = ((byte)(0xFF & (l >> 32)));
-        b[5] = ((byte)(0xFF & (l >> 40)));
-        b[6] = ((byte)(0xFF & (l >> 48)));
-        b[7] = ((byte)(0xFF & (l >> 56)));
-
-        return b;
-
-    }
-
-
-    private static long readModifiedLong(byte[] b)
-    {
-        byte b0 = (byte) (b[0] ^ 0x55);
-        long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
-        l |=  ((long) b[1]) << 8;
-        l |=  ((long) b[2]) << 16;
-        l |=  ((long) b[3]) << 24;
-        l |=  ((long) b[4]) << 32;
-        l |=  ((long) b[5]) << 40;
-        l |=  ((long) b[6]) << 48;
-        l |=  ((long) b[7]) << 56;
-
-
-        return l;
-    }
-
-
-    public static void main(String[] args)
-    {
-        StringBuilder s = new StringBuilder();
-
-
-
-        for(long i = 1000; i < 1010; i++)
-        {
-            byte[] b = convertLong(i);
-            System.out.println(appendHex(new StringBuilder(),b));
-            System.out.println(readModifiedLong(b));
-
-        }
-
-    }
-}
+}
\ No newline at end of file

Deleted: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,142 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.MessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTB extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        try
-        {
-            final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
-            final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
-            final int contentChunkCount = tupleInput.readInt();
-            return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
-        }
-        catch (Exception e)
-        {
-            _log.error("Error converting entry to object: " + e, e);
-            // annoyingly just have to return null since we cannot throw
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        MessageMetaData message = (MessageMetaData) object;
-        try
-        {
-            writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
-        }
-        catch (AMQException e)
-        {
-            // can't do anything else since the BDB interface precludes throwing any exceptions
-            // in practice we should never get an exception
-            throw new RuntimeException("Error converting object to entry: " + e, e);
-        }
-        writeContentHeader(message.getContentHeaderBody(), tupleOutput);
-        tupleOutput.writeInt(message.getContentChunkCount());
-    }
-
-    private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
-    {
-
-        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
-        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-        final boolean mandatory = tupleInput.readBoolean();
-        final boolean immediate = tupleInput.readBoolean();
-
-        return new MessagePublishInfo()
-        {
-
-            public AMQShortString getExchange()
-            {
-                return exchange;
-            }
-
-            public void setExchange(AMQShortString exchange)
-            {
-
-            }
-
-            public boolean isImmediate()
-            {
-                return immediate;
-            }
-
-            public boolean isMandatory()
-            {
-                return mandatory;
-            }
-
-            public AMQShortString getRoutingKey()
-            {
-                return routingKey;
-            }
-        }   ;
-
-    }
-
-
-    private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
-    {
-        int bodySize = tupleInput.readInt();
-        byte[] underlying = new byte[bodySize];
-        tupleInput.readFast(underlying);
-        ByteBuffer buf = ByteBuffer.wrap(underlying);
-
-        return ContentHeaderBody.createFromBuffer(buf, bodySize);
-    }
-
-    private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
-    {
-
-
-        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
-        tupleOutput.writeBoolean(publishBody.isMandatory());
-        tupleOutput.writeBoolean(publishBody.isImmediate());
-        
-    }
-
-    private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
-    {
-        // write out the content header body
-        final int bodySize = headerBody.getSize();
-        byte[] underlying = new byte[bodySize];
-        ByteBuffer buf = ByteBuffer.wrap(underlying);
-        headerBody.writePayload(buf);
-        tupleOutput.writeInt(bodySize);
-        tupleOutput.writeFast(underlying);
-    }
-}

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,8 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
 import org.apache.qpid.framing.AMQShortString;
 
 /**
@@ -10,55 +27,26 @@
  */
 public class QueueEntryKey
 {
-    public AMQShortString queueName;
-    public long messageId;
+    private AMQShortString _queueName;
+    private long _messageId;
 
 
-    public QueueEntryKey()
+    public QueueEntryKey(AMQShortString queueName, long messageId)
     {
+        _queueName = queueName;
+        _messageId = messageId;
     }
 
 
-    public QueueEntryKey(byte[] payload)
+    public AMQShortString getQueueName()
     {
-        final TupleInput ti = new TupleInput(payload);
-
-        queueName = AMQShortStringEncoding.readShortString(ti);
-
-        messageId = ti.readLong();
-
+        return _queueName;
     }
 
-    public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
-    {
-        public Object entryToObject(TupleInput tupleInput)
-        {
-            final QueueEntryKey mk = new QueueEntryKey();
 
-
-            mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
-            mk.messageId = tupleInput.readLong();
-
-            return mk;
-        }
-
-        public void objectToEntry(Object object, TupleOutput tupleOutput)
-        {
-            final QueueEntryKey mk = (QueueEntryKey) object;
-
-            AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
-            tupleOutput.writeLong(mk.messageId);
-
-        }
-
-
-    }
-
-    public QueueEntryKey(AMQShortString queueName, long messageId)
+    public long getMessageId()
     {
-        this.queueName = queueName;
-        this.messageId = messageId;
+        return _messageId;
     }
 
-
 }

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_1 extends MessageContentKey
+{
+    private int _chunkNum;
+    
+    public MessageContentKey_1(long messageId, int chunkNo)
+    {
+        super(messageId);
+        _chunkNum = chunkNo;
+    }    
+
+    public int getChunk()
+    {
+        return _chunkNum;
+    }
+
+    public void setChunk(int chunk)
+    {
+        this._chunkNum = chunk;
+    }
+}
\ No newline at end of file

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_3 extends MessageContentKey
+{
+    private int _offset;
+    
+    public MessageContentKey_3(long messageId, int chunkNo)
+    {
+        super(messageId);
+        _offset = chunkNo;
+    }    
+
+    public int getOffset()
+    {
+        return _offset;
+    }
+
+    public void setOffset(int chunk)
+    {
+        this._offset = chunk;
+    }
+}
\ No newline at end of file

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeRecord extends Object
+{
+    private final AMQShortString _exchangeName;
+    private final AMQShortString _exchangeType;
+    private final boolean _autoDelete;
+
+    public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
+    {
+        _exchangeName = exchangeName;
+        _exchangeType = exchangeType;
+        _autoDelete = autoDelete;
+    }
+
+    public AMQShortString getNameShortString()
+    {
+        return _exchangeName;
+    }
+
+    public AMQShortString getType()
+    {
+        return _exchangeType;
+    }
+
+    public boolean isAutoDelete()
+    {
+        return _autoDelete;
+    }
+
+}

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueRecord extends Object
+{
+    private final AMQShortString _queueName;
+    private final AMQShortString _owner;
+    private final FieldTable _arguments;
+
+    public QueueRecord(AMQShortString queueName, AMQShortString owner, FieldTable arguments)
+    {
+        _queueName = queueName;
+        _owner = owner;
+        _arguments = arguments;
+    }
+
+    public AMQShortString getNameShortString()
+    {
+        return _queueName;
+    }
+
+    public AMQShortString getOwner()
+    {
+        return _owner;
+    }
+
+    public FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+}

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import org.apache.qpid.framing.FieldTable;
-
 public interface BindingTuple
 {
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -21,13 +21,14 @@
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
 import com.sleepycat.bind.tuple.TupleBinding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+
 public class BindingTupleBindingFactory extends TupleBindingFactory
 {
-    public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+    public BindingTupleBindingFactory(int version)
     {
-        super(version, virtualhost);
+        super(version);
     }
 
     public TupleBinding getInstance()
@@ -35,10 +36,12 @@
         switch (_version)
         {
             default:
+            case 3:
+                //no change from v2
             case 2:
-                return new BindingTuple_2(_virtualhost);
+                return new BindingTuple_2();
             case 1:
-                return new BindingTuple_1(_virtualhost);
+                return new BindingTuple_1();
         }
     }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,11 +1,8 @@
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.server.store.berkeleydb.BindingKey;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
@@ -16,15 +13,8 @@
 {
     protected static final Logger _log = Logger.getLogger(BindingTuple.class);
 
-    protected VirtualHost _virtualhost;
-
-    public BindingTuple_1(VirtualHost virtualHost)
+    public BindingTuple_1()
     {
-        if (virtualHost == null)
-        {
-            throw new NullPointerException("Virtualhost cannot be null");
-        }
-        _virtualhost = virtualHost;
     }
 
     public Object entryToObject(TupleInput tupleInput)
@@ -33,7 +23,7 @@
         AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
         AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
 
-        return createNewBindingKey(exchangeName, queueName, routingKey);
+        return new BindingKey(exchangeName, queueName, routingKey, null);
     }
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)
@@ -45,16 +35,4 @@
         AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
     }
 
-    private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
-    {
-        return createNewBindingKey(exchangeName, queueName, routingKey, null);
-    }
-
-    // Addition for Version 2 of this table
-    protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
-                                         AMQShortString routingKey, FieldTable arguments)
-    {
-        return new BindingKey(exchangeName, queueName, routingKey, arguments);
-    }
-
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,24 +1,21 @@
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
-import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
 
 public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
 {
 
-    public BindingTuple_2(VirtualHost virtualHost)
+    public BindingTuple_2()
     {
-        super(virtualHost);
+        super();
     }
 
     public Object entryToObject(TupleInput tupleInput)
@@ -28,7 +25,7 @@
         AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
 
         FieldTable arguments;
-
+        
         // Addition for Version 2 of this table
         try
         {
@@ -40,7 +37,7 @@
             return null;
         }
 
-        return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
+        return new BindingKey(exchangeName, queueName, routingKey, arguments);
     }
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)

Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_1 extends com.sleepycat.bind.tuple.TupleBinding
+{
+    
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        long messageId = tupleInput.readLong();
+        int chunk = tupleInput.readInt();
+        
+        return new MessageContentKey_1(messageId, chunk);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        final MessageContentKey_1 mk = (MessageContentKey_1) object;
+        tupleOutput.writeLong(mk.getMessageId());            
+        tupleOutput.writeInt(mk.getChunk());
+    }
+
+}
\ No newline at end of file

Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_3 extends com.sleepycat.bind.tuple.TupleBinding
+{
+    
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        long messageId = tupleInput.readLong();
+        int offset = tupleInput.readInt();
+        
+        return new MessageContentKey_3(messageId, offset);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        final MessageContentKey_3 mk = (MessageContentKey_3) object;
+        tupleOutput.writeLong(mk.getMessageId());            
+        tupleOutput.writeInt(mk.getOffset());
+    }
+
+}
\ No newline at end of file

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory
+{
+    public MessageContentKeyTupleBindingFactory(int version)
+    {
+        super(version);
+    }
+
+    public TupleBinding getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 3:
+                return new MessageContentKeyTB_3();
+            case 2:
+                //no change from v1
+            case 1:
+                return new MessageContentKeyTB_1();
+        }
+    }
+}

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_1 extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(MessageMetaDataTB_1.class);
+    
+    public MessageMetaDataTB_1()
+    {
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+            final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+            final int contentChunkCount = tupleInput.readInt();
+            
+            return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+        }
+        catch (Exception e)
+        {
+            _log.error("Error converting entry to object: " + e, e);
+            // annoyingly just have to return null since we cannot throw
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        MessageMetaData message = (MessageMetaData) object;
+        try
+        {
+            writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+        }
+        catch (AMQException e)
+        {
+            // can't do anything else since the BDB interface precludes throwing any exceptions
+            // in practice we should never get an exception
+            throw new RuntimeException("Error converting object to entry: " + e, e);
+        }
+        writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+        tupleOutput.writeInt(message.getContentChunkCount());
+    }
+
+    private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+    {
+
+        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        final boolean mandatory = tupleInput.readBoolean();
+        final boolean immediate = tupleInput.readBoolean();
+
+        return new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return exchange;
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return mandatory;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return routingKey;
+            }
+        }   ;
+
+    }
+
+
+    private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+        int bodySize = tupleInput.readInt();
+        byte[] underlying = new byte[bodySize];
+        tupleInput.readFast(underlying);
+        ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+        return ContentHeaderBody.createFromBuffer(buf, bodySize);
+    }
+
+    private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+    {
+
+
+        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+        tupleOutput.writeBoolean(publishBody.isMandatory());
+        tupleOutput.writeBoolean(publishBody.isImmediate());
+        
+    }
+
+    private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+    {
+        // write out the content header body
+        final int bodySize = headerBody.getSize();
+        byte[] underlying = new byte[bodySize];
+        ByteBuffer buf = ByteBuffer.wrap(underlying);
+        headerBody.writePayload(buf);
+        tupleOutput.writeInt(bodySize);
+        tupleOutput.writeFast(underlying);
+    }
+}

Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_3 extends MessageMetaDataTB_1
+{
+    private static final Logger _log = Logger.getLogger(MessageMetaDataTB_3.class);
+
+    @Override
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            final int bodySize = tupleInput.readInt();
+            byte[] dataAsBytes = new byte[bodySize];
+            tupleInput.readFast(dataAsBytes);
+
+            java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+            buf.position(1);
+            buf = buf.slice();
+            MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+            StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+            
+            return metaData;
+        }
+        catch (Exception e)
+        {
+            _log.error("Error converting entry to object: " + e, e);
+            // annoyingly just have to return null since we cannot throw
+            return null;
+        }
+    }
+
+    @Override
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+        
+        final int bodySize = 1 + metaData.getStorableSize();
+        byte[] underlying = new byte[bodySize];
+        underlying[0] = (byte) metaData.getType().ordinal();
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+        buf.position(1);
+        buf = buf.slice();
+
+        metaData.writeToBuffer(0, buf);
+        tupleOutput.writeInt(bodySize);
+        tupleOutput.writeFast(underlying);
+    }
+}

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory
+{
+    public MessageMetaDataTupleBindingFactory(int version)
+    {
+        super(version);
+    }
+
+    public TupleBinding getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 3:
+                return new MessageMetaDataTB_3();
+            case 2:
+                //no change from v1
+            case 1:
+                return new MessageMetaDataTB_1();
+        }
+    }
+}

Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding
+{
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+        Long messageId = tupleInput.readLong();
+
+        return new QueueEntryKey(queueName, messageId);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        final QueueEntryKey mk = (QueueEntryKey) object;
+
+        AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+        tupleOutput.writeLong(mk.getMessageId());
+    }
+}
\ No newline at end of file

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import org.apache.qpid.framing.FieldTable;
-
 public interface QueueTuple
 {
-    // Addition for Version 2 
-    public void setArguments(FieldTable arguments);
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import com.sleepycat.bind.tuple.TupleBinding;
 
 public class QueueTupleBindingFactory extends TupleBindingFactory
 {
-    public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+
+    public QueueTupleBindingFactory(int version)
     {
-        super(version,virtualHost);
+        super(version);
     }
 
     public TupleBinding getInstance()
@@ -35,10 +35,12 @@
         switch (_version)
         {
             default:
+            case 3:
+                //no change from v2
             case 2:
-                return new QueueTuple_2(_virtualhost);
+                return new QueueTuple_2();
             case 1:
-                return new QueueTuple_1(_virtualhost);
+                return new QueueTuple_1();
         }
     }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,28 +20,15 @@
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
 
 public class QueueTuple_1 extends TupleBinding implements QueueTuple
 {
-    protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
 
-    protected final VirtualHost _virtualHost;
-
-    public QueueTuple_1(VirtualHost virtualHost)
+    public QueueTuple_1()
     {
-        if (virtualHost == null)
-        {
-            throw new NullPointerException("Virtualhost cannot be null");
-        }
-        _virtualHost = virtualHost;
     }
 
     public Object entryToObject(TupleInput tupleInput)
@@ -49,58 +36,15 @@
         AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
         AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
 
-        return createNewQueue(name, owner);
+        return new QueueRecord(name, owner, null);
     }
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)
     {
-        AMQQueue queue = (AMQQueue) object;
+        QueueRecord queue = (QueueRecord) object;
 
-        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
         AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        //Version1 tuple can't store the queue arguments.
     }
-
-    // Addition for Version 2 of this table
-    public void setArguments(FieldTable arguments)
-    {
-        //no-op
-    }
-
-    protected Object createNewQueue(AMQShortString name, AMQShortString owner)
-    {
-        return createNewQueue(name, owner, null);
-    }
-
-    // Addition for Version 2 of this table
-    protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
-    {
-        try
-        {
-
-            AMQQueue queue;
-            AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
-
-            if (existing != null)
-            {
-                _logger.info("Queue :" + existing + ": already exists in configured broker.");
-
-                queue = existing;
-            }
-            else
-            {
-                // Retrieve the existing Queue object
-                queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
-                _virtualHost.getQueueRegistry().registerQueue(queue);
-                _logger.info("Recovering queue " + queue.getName() + " with owner:"
-                             + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
-            }
-
-            return queue;
-        }
-        catch (AMQException e)
-        {
-            _logger.error("Unable to create queue: " + e, e);
-            return null;
-        }
-    }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,22 +20,23 @@
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
 public class QueueTuple_2 extends QueueTuple_1
 {
+    protected static final Logger _logger = Logger.getLogger(QueueTuple_2.class);
+    
     protected FieldTable _arguments;
 
-    public QueueTuple_2(VirtualHost virtualHost)
+    public QueueTuple_2()
     {
-        super(virtualHost);
+        super();
     }
 
     public Object entryToObject(TupleInput tupleInput)
@@ -44,10 +45,10 @@
         {
             AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
             AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-            // Addition for Version 2 of this table
+            // Addition for Version 2 of this table, read the queue arguments
             FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
 
-            return createNewQueue(name, owner, arguments);
+            return new QueueRecord(name, owner, arguments);
         }
         catch (DatabaseException e)
         {
@@ -59,17 +60,11 @@
 
     public void objectToEntry(Object object, TupleOutput tupleOutput)
     {
-        AMQQueue queue = (AMQQueue) object;
+        QueueRecord queue = (QueueRecord) object;
 
-        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
         AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
-        // Addition for Version 2 of this table
-        FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
+        // Addition for Version 2 of this table, store the queue arguments
+        FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
     }
-
-    // Addition for Version 2 of this table
-    public void setArguments(FieldTable arguments)
-    {
-        _arguments = arguments;
-    }
 }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -27,17 +27,9 @@
 {
     protected int _version;
 
-    protected VirtualHost _virtualhost;
-
-    public TupleBindingFactory(int version, VirtualHost virtualhost)
+    public TupleBindingFactory(int version)
     {
-        if (virtualhost == null)
-        {
-            throw new NullPointerException("Virtualhost cannot be null");
-        }
-        
         _version = version;
-        _virtualhost = virtualhost;        
     }
 
     public abstract TupleBinding getInstance();

Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
 import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.util.FileUtils;
@@ -200,7 +201,7 @@
         testVirtualhost.setProperty("store.version", version);
 
         ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
-                registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
+                registerVirtualHost(new VirtualHostImpl(new VirtualHostConfiguration("bdbtest",testVirtualhost), null /* TODO */));
 
         TransportConnection.createVMBroker(port);
 

Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2010-05-14 16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -341,12 +342,12 @@
         CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
 
         // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
-        _newVirtualHost = new VirtualHost(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
-        _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
+        _newVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
+        _oldVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
 
         //Create a new messageStore
         _newMessageStore = new BDBMessageStore();
-        _newMessageStore.configure(_newVirtualHost, toDir, false);
+        //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
 
         try
         {
@@ -356,7 +357,7 @@
                 default:
                 case 1:
                     _oldMessageStore = new BDBMessageStore(1);
-                    _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+                    //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
                     _oldMessageStore.start();
                     upgradeFromVersion_1();
                     break;



More information about the rhmessaging-commits mailing list