[rhmessaging-commits] rhmessaging commits: r4050 - in store/trunk/java/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/records and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jun 25 05:44:26 EDT 2010


Author: rgemmell
Date: 2010-06-25 05:44:25 -0400 (Fri, 25 Jun 2010)
New Revision: 4050

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
   store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial work on v2->v3 BDBMessageStore upgrade tool


Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-25 09:44:25 UTC (rev 4050)
@@ -1043,14 +1043,28 @@
             _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
         }
         
+        QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), 
+                                                queue.getOwner(), queue.isExclusive(), arguments);
+        
+        createQueue(queueRecord);
+    }
+    
+    /**
+     * Makes the specified queue persistent. 
+     * 
+     * Only intended for direct use during store upgrades.
+     *
+     * @param queueRecord     Details of the queue to store.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
+    protected void createQueue(QueueRecord queueRecord) throws AMQException
+    {
         if (_state != State.RECOVERING)
         {
-            QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), 
-                                                     queue.getOwner(), queue.isExclusive(), arguments);
-            
             DatabaseEntry key = new DatabaseEntry();
             EntryBinding keyBinding = new AMQShortStringTB();
-            keyBinding.objectToEntry(queue.getNameShortString(), key);
+            keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
 
             DatabaseEntry value = new DatabaseEntry();
             TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
@@ -1062,7 +1076,8 @@
             }
             catch (DatabaseException e)
             {
-                throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
+                throw new AMQException("Error writing AMQQueue with name " + 
+                        queueRecord.getNameShortString().toString() + " to database: " + e, e);
             }
         }
     }
@@ -1477,14 +1492,9 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQException
-    {
-        final int limit = offset + dst.remaining();
-        
+    {    
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         
-        //TODO: if requested offset is non zero, use partial record (key-only) search to 
-        //locate the first record key to prevent reading in data we dont need
-        
         //Start from 0 offset and search for the starting chunk. 
         MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
         TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
@@ -1505,7 +1515,7 @@
         {
             cursor = _messageContentDb.openCursor(null, null);
             
-            OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+            OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
             while (status == OperationStatus.SUCCESS)
             {
                 mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
@@ -1524,7 +1534,6 @@
                 
                 seenSoFar += size;
                 
-                //TODO: can change this guard if we start recording the last byte in the chunk record
                 if(seenSoFar >= offset)
                 {
                     byte[] dataAsBytes = buf.array();
@@ -1600,6 +1609,11 @@
     {
         return _bindingTupleBindingFactory;
     }
+    
+    protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
+    {
+        return _metaDataTupleBindingFactory;
+    }
 
     //Package getters for the various databases used by the Store
 

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java	2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java	2010-06-25 09:44:25 UTC (rev 4050)
@@ -52,6 +52,11 @@
     {
         return _exclusive;
     }
+    
+    public void setExclusive(boolean exclusive)
+    {
+        _exclusive = exclusive;
+    }
 
     public FieldTable getArguments()
     {

Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2010-06-24 20:48:55 UTC (rev 4049)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2010-06-25 09:44:25 UTC (rev 4050)
@@ -22,15 +22,18 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_1;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.util.FileUtils;
 import org.apache.commons.cli.PosixParser;
@@ -39,13 +42,14 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.io.File;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.io.IOException;
 import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -53,21 +57,15 @@
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Database;
 import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
 
 /**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2 Store.
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V2 Store to a V3 Store.
  *
- * NOTE: No checks are in place to validate that the input is V1.
+ * Currently upgrade is fixed from v2 -> v3
  *
- * Currently upgrade is fixed from v1 -> v2
- * Only the Queue and Binding databases are migrated all other databases are copied as DB entries.
- *
  * Improvments:
  * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
- * - Add a version value into the store so that a quick check can be performed to perform the upgrades.
  * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
- * - Currently only the Queue and Binding DB are processed for upgrade all the other db data is copied between stores.
  * - Add process logging and disable all Store and Qpid logging.
  */
 public class BDBStoreUpgrade
@@ -84,10 +82,6 @@
     BDBMessageStore _oldMessageStore;
     /** The New Store */
     BDBMessageStore _newMessageStore;
-    /** A VHost used in the migration of the queues from the old VHost */
-    VirtualHost _newVirtualHost;
-    /** A VHost used in the creation of the queues from the old store */
-    VirtualHost _oldVirtualHost;
     /** The file ending that is used by BDB Store Files */
     private static final String BDB_FILE_ENDING = ".jdb";
 
@@ -96,7 +90,7 @@
     private boolean _interactive;
     private boolean _force;
 
-    private static final String VERSION = "1.0";
+    private static final String VERSION = "2.0";
     private static final String OPTION_INPUT_SHORT = "i";
     private static final String OPTION_INPUT = "input";
     private static final String OPTION_OUTPUT_SHORT = "o";
@@ -312,7 +306,7 @@
                     if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
                                       "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
                     {
-                        throw new IllegalArgumentException("Upgrade stopped as user request as no DB Backup performed.");
+                        throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
                     }
                 }
             }
@@ -341,13 +335,9 @@
 
         CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
 
-        // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
-        _newVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
-        _oldVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
-
         //Create a new messageStore
         _newMessageStore = new BDBMessageStore();
-        //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
+        _newMessageStore.configure(toDir, false);
 
         try
         {
@@ -355,25 +345,19 @@
             switch (version)
             {
                 default:
-                case 1:
-                    _oldMessageStore = new BDBMessageStore(1);
-                    //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+                case 2:
+                    _oldMessageStore = new BDBMessageStore(2);
+                    _oldMessageStore.configure(fromDir, true);
                     _oldMessageStore.start();
-                    upgradeFromVersion_1();
+                    upgradeFromVersion_2();
                     break;
             }
         }
         finally
         {
-            _newVirtualHost.close();
-            _oldVirtualHost.close();
-
             _newMessageStore.close();
             _oldMessageStore.close();
 
-            //Shutdown the AR that the Vhosts will have created.
-            ApplicationRegistry.remove(1);
-
             // if we are running inplace then swap fromDir and toDir
             if (inplace)
             {
@@ -395,84 +379,170 @@
         }
     }
 
-    private void upgradeFromVersion_1() throws AMQException, DatabaseException
+    private void upgradeFromVersion_2() throws AMQException, DatabaseException
     {
+        _logger.info("Starting store upgrade from version 2");
+        
+        //Migrate _exchangeDb;
+        _logger.info("Exchanges");
 
-        _logger.info("Starting store upgrade from version 1");
+        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
 
-        _logger.info("Message Metadata");
-        //Migrate _messageMetaDataDb;
-        moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(), "Message MetaData");
+        final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
+        final TupleBinding exchangeTB = new ExchangeTB();
+        
+        DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
+        {           
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
+                AMQShortString type = exchangeRec.getType();
 
-        _logger.info("Message Contents");
-        //Migrate _messageContentDb;
-        moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(), "Message Content");
+                if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
+                {
+                    topicExchanges.add(exchangeRec.getNameShortString());
+                }
+            }
+        };
+        _oldMessageStore.visitExchanges(exchangeListVisitor);
 
+
+        //Migrate _queueBindingsDb;
+        _logger.info("QueueBindings");
+        moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
+
+        //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those 
+        //which have a colon in their name and are bound to the Topic exchanges above
+        final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
+        final TupleBinding bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+        
+        DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
+        {           
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
+                AMQShortString queueName = bindingRec.getQueueName();
+                AMQShortString exchangeName = bindingRec.getExchangeName();
+                
+                if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
+                {
+                    durableSubQueues.add(queueName);
+                }
+            }
+        };
+        _oldMessageStore.visitBindings(durSubQueueListVisitor);
+
+
+        //Migrate _queueDb;
         _logger.info("Queues");
-        //Migrate _queueDb;
-        //Get the oldMessageStore Tuple Binding which does the parsing
+
         final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
 
-        //Create a visitor that will take the queues in the oldMessageStore and add them to the newMessageStore
         DatabaseVisitor queueVisitor = new DatabaseVisitor()
         {
             public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException
             {
-                AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
+                QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
+                AMQShortString queueName = queueRec.getNameShortString();
 
-                //The simple call to createQueue with the AMQQueue object is sufficient for a v1 upgrade as all the
-                // extra properties in v2 will be defaulted.
-                _newMessageStore.createQueue(queue);
+                //if the queue name is in the gathered list then set its exclusivity true
+                if (durableSubQueues.contains(queueName))
+                {
+                    _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
+                    queueRec.setExclusive(true);
+                }
+                
+                //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
+                //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
+                _newMessageStore.createQueue(queueRec);
 
-                // We need to call queue stop here as all the queues were already registerd when the _oldMessageStore
-                // state was recovered. Now we are creating a second Queue it will aquire the Executor Service again!
-                // But the queueRegistry is a set so only one release will be performed.
-                //
-                // queue.stop();
-                //
-                // An alternative approach was taken here: If we don't recover the store
                 _count++;
             }
         };
-        //Perform the visit
         _oldMessageStore.visitQueues(queueVisitor);
 
         logCount(queueVisitor.getVisitedCount(), "queue");
 
-        _logger.info("Delivery Records");
-        //Migrate _deliveryDb;
-        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
 
-        _logger.info("Exchanges");
-        //Migrate _exchangeDb;
-        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
+        //Migrate _messageMetaDataDb;
+        _logger.info("Message MetaData");
+        
+        final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
+        final TupleBinding oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+        final TupleBinding newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+        
+        DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
+        {
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
 
-        _logger.info("QueueBindings");
-        //Migrate _queueBindingsDb;
-        final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+                DatabaseEntry newValue = new DatabaseEntry();
+                newMetaDataTupleBinding.objectToEntry(metaData, newValue);
+                
+                newMetaDataDB.put(null, key, newValue);
 
-        //Create a visitor that to read the old format queue bindings
-        DatabaseVisitor queueBindings = new DatabaseVisitor()
+                _count++;
+            }
+        };
+        _oldMessageStore.visitMetaDataDb(metaDataVisitor);
+
+        logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
+
+
+        //Migrate _messageContentDb;
+        _logger.info("Message Contents");
+        final Database newContentDB = _newMessageStore.getContentDb();
+        
+        final TupleBinding oldContentKeyTupleBinding = new MessageContentKeyTB_1();
+        final TupleBinding newContentKeyTupleBinding = new MessageContentKeyTB_3();
+        final TupleBinding contentTB = new ContentTB();
+        
+        DatabaseVisitor contentVisitor = new DatabaseVisitor()
         {
+            long _prevMsgId = -1; //Initialise to invalid value
+            int _bytesSeenSoFar = 0;
+            
             public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
             {
-                BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
+                //determine the msgId of the current entry
+                MessageContentKey_1 contentKey = (MessageContentKey_1) oldContentKeyTupleBinding.entryToObject(key);
+                long msgId = contentKey.getMessageId();
 
-                //Create a new Format TupleBinding
-                TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
+                //if this is a new message, restart the byte offset count.
+                if(_prevMsgId != msgId)
+                {
+                    _bytesSeenSoFar = 0;
+                }
 
-                DatabaseEntry newKey = new DatabaseEntry();
-                newBindingTupleBinding.objectToEntry(queueBinding, newKey);
+                //determine the content size
+                ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
+                int contentSize = content.limit();
 
-                ByteBinding.byteToEntry((byte) 0, value);
-                _newMessageStore.getBindingsDb().put(null, newKey, value);
+                //create the new key: id + previously seen data count
+                MessageContentKey_3 newKey = new MessageContentKey_3(msgId, _bytesSeenSoFar);
+                DatabaseEntry newKeyEntry = new DatabaseEntry();
+                newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
 
+                DatabaseEntry newValueEntry = new DatabaseEntry();
+                contentTB.objectToEntry(content, newValueEntry);
+
+                newContentDB.put(null, newKeyEntry, newValueEntry);
+
+                _prevMsgId = msgId;
+                _bytesSeenSoFar += contentSize;
+
                 _count++;
             }
         };
+        _oldMessageStore.visitContentDb(contentVisitor);
 
-        _oldMessageStore.visitBindings(queueBindings);
-        logCount(queueBindings.getVisitedCount(), "queue binding");
+        logCount(metaDataVisitor.getVisitedCount(), "Message ContentChunk");
+
+
+        //Migrate _deliveryDb;
+        _logger.info("Delivery Records");
+        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
     }
 
     /**
@@ -713,7 +783,7 @@
         }
         catch (RuntimeException re)
         {
-            if (!re.getMessage().equals("User aborted process"))
+            if (!("User aborted process").equals(re.getMessage()))
             {
                 re.printStackTrace();
                 _logger.error("Upgrade Failed: " + re.getMessage());
@@ -762,7 +832,7 @@
         _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
         try
         {
-            new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(1);
+            new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(2);
 
             _logger.info("Upgrade complete.");
         }
@@ -772,9 +842,9 @@
         }
         catch (DatabaseException de)
         {
-            if (de.getMessage().endsWith("Error: Unable to load BDBStore as version 1. Store on disk contains version 2 data."))
+            if (de.getMessage().endsWith("Error: Unable to load BDBStore as version 2. Store on disk contains version 3 data."))
             {
-                System.out.println("Store '" + fromDir + "' has already been upgraded to version 2.");
+                System.out.println("Store '" + fromDir + "' has already been upgraded to version 3.");
             }
             else
             {
@@ -784,7 +854,7 @@
         }
         catch (RuntimeException re)
         {
-            if (!re.getMessage().equals("User aborted process"))
+            if (!("User aborted process").equals(re.getMessage()))
             {
                 re.printStackTrace();
                 _logger.error("Upgrade Failed: " + re.getMessage());



More information about the rhmessaging-commits mailing list