[rhmessaging-commits] rhmessaging commits: r2572 - in store/branches/java/broker-queue-refactor/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Oct 2 06:43:31 EDT 2008


Author: ritchiem
Date: 2008-10-02 06:43:31 -0400 (Thu, 02 Oct 2008)
New Revision: 2572

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Changes to BDBStoreUpgrade tool based on feed back.
* If input directory is not a store directory but contains a number of stores then each of these stores is processed in turn.
* Augmented BDBMessageStore to provide a readonly mode of opening stores. This is required to prevent the log format of the store being upgraded and therefore rendering older stores unusable with their release version.
 NOTE : This change will now allow the tool to be run against a store that is part of an actively running broker. Now this will not have ANY detrimental affects to the active broker as the store is opened readonly, which is the Oracle described mechanism for multiple process access to a BDB JE store. There are no guarrantees about the contents of the upgraded result as when a store is opened readonly only a snapshot at that moment is presented to the process. So if the running broker makes changes to the store these will not be visible to the upgrade tool. The safest approach is to run the upgrade tool against an open copy of the store.

* Used Commons CLI to peform command line parsing
* User Interaction can be quelled (use -q|--quiet to disable)
* Added a -f|--force to allow the suppression of all questions and to create a backup if that option is present
* Added a -b|--backup [<location>] option. If location is not specified then store will be replicated in its current directory with '-Backup' added to directory.
* Added -h|--help option
* Improved display of options
* Improved error detection and display

* Removed sole Trace level log statement from BDBBackup.
* Improved startup logging of BDBMessageStore. Recovery is only performed if a store was opened. If a new store was created then recovery is not needed. This lays the ground work for us to be able to prevent new stores being created.
Once QPID-1305 has been completed I would suggest that we add a startup command line option '--createStore' without which a startup error will occur if a store is not present. This will prevent issues where the store is automatically created in a location that is not desired due to configuration error.

* The tests have been updated to reflect the new parameters on configure for readOnly stores.
* BDBStoreTest has also had a recover call removed as this was purely present to start the store. Calling recover() at this point read wrongly as the code had just made sure that the store would not exist so there would be no recovery necessary. With the change above to detect the creation of a new store it is now possible to automatically start the store in configure() without the user having to call recover() to start the store.



Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java	2008-10-02 10:24:57 UTC (rev 2571)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -189,9 +189,9 @@
      */
     public String[] takeBackupNoLock(String fromdir, String todir)
     {
-        if (log.isTraceEnabled())
+        if (log.isDebugEnabled())
         {
-            log.trace("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir
+            log.debug("public void takeBackupNoLock(String fromdir = " + fromdir + ", String todir = " + todir
                 + "): called");
         }
 

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-10-02 10:24:57 UTC (rev 2571)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -49,7 +49,6 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.mina.common.TrafficMask;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -129,7 +128,6 @@
 
     /** The data version this store should run with */
     private int _version;
-
     private enum State
     {
         INITIAL,
@@ -185,8 +183,6 @@
      */
     public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
     {
-        boolean verifyVersion = false;
-
         File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
         if (!environmentPath.exists())
         {
@@ -199,15 +195,15 @@
 
         _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
 
-        configure(virtualHost, environmentPath);
+        configure(virtualHost, environmentPath, false);
     }
 
     public void configure(File environmentPath) throws AMQException, DatabaseException
     {
-        configure(null, environmentPath);
+        configure(null, environmentPath, false);
     }
 
-    public void configure(VirtualHost virtualHost, File environmentPath) throws AMQException, DatabaseException
+    public void configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
     {
         stateTransition(State.INITIAL, State.CONFIGURING);
 
@@ -218,31 +214,61 @@
         if (virtualHost != null)
         {
             setVirtualHost(virtualHost);
-            createTupleBindingFactories(_version);
         }
 
-        setupStore(environmentPath);
+        boolean newEnvironment = setupStore(environmentPath, readonly);
 
-        stateTransition(State.CONFIGURING, State.CONFIGURED);
+        // Performing recovery when we only want read access will cause all the broker objects to be recreated
+        // This will/may include thread pool creations that may be duplicated when manually inspecting the state of
+        // the store. Simplest solution is to prevent the initial creation of the state by blocking recovery.
+        if (!readonly)
+        {
+            stateTransition(State.CONFIGURING, State.CONFIGURED);
 
-        // this recovers durable queues and persistent messages
-        if (virtualHost != null)
+            //If we have loaded an environment and have virtualHost configured then recover environment
+            if (!newEnvironment && virtualHost != null)
+            {
+                // this recovers durable queues and persistent messages
+                recover();
+            }
+        }
+
+        //if we have a new environment there we can jump to started as there is no recovery requried..
+        if (newEnvironment) // && !readonly is implied as you cant get a newEnviroment in readonly mode.
         {
-            recover();
+            stateTransition(State.CONFIGURED, State.STARTED);
         }
+
     }
 
-    private void setupStore(File storePath) throws DatabaseException, AMQException
+    /**
+     * Move the store state from CONFIGURING to STARTED.
+     *
+     * This is required if you do not want to perform recovery of the store data
+     *
+     * @throws AMQException if the store is not in the correct state
+     */
+    public void start() throws AMQException
     {
+        stateTransition(State.CONFIGURING, State.STARTED);
+    }
+
+    private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQException
+    {
         checkState(State.CONFIGURING);
 
-        createEnvironment(storePath);
+        boolean newEnvironment = createEnvironment(storePath, readonly);
 
         verifyVersionByTables();
 
-        openDatabases();
+        openDatabases(readonly);
 
-        _commitThread.start();
+        if (!readonly)
+        {
+            _commitThread.start();
+        }
+
+        return newEnvironment;
     }
 
     private void verifyVersionByTables() throws DatabaseException
@@ -278,11 +304,6 @@
         }
     }
 
-    protected void startStore() throws AMQException
-    {
-        stateTransition(State.CONFIGURED, State.STARTED);
-    }
-
     private void createTupleBindingFactories(int version)
     {
         _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
@@ -308,22 +329,57 @@
         }
     }
 
-    private void createEnvironment(File environmentPath) throws DatabaseException
+    private boolean createEnvironment(File environmentPath, boolean readonly) throws DatabaseException
     {
         _log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
         EnvironmentConfig envConfig = new EnvironmentConfig();
-        envConfig.setAllowCreate(true);
+        // This is what allos the creation of the store if it does not already exist.
+        envConfig.setAllowCreate(false);
         envConfig.setTransactional(true);
         envConfig.setConfigParam("je.lock.nLockTables", "7");
-        _environment = new Environment(environmentPath, envConfig);
+        //This prevents background threads running which will potentially update the store.
+        envConfig.setReadOnly(readonly);
+        try
+        {
+            _environment = new Environment(environmentPath, envConfig);
+            System.err.println("Created("+this+") Environment:"+_environment);
+            new Exception().printStackTrace();
+            return false;
+        }
+        catch (DatabaseException de)
+        {
+            if (de.getMessage().contains("Environment.setAllowCreate is false"))
+            {
+                //Allow the creation this time
+                envConfig.setAllowCreate(true);
+                if (_environment != null )
+                {
+                    _environment.cleanLog();
+                    _environment.close();
+                    System.err.println("Close("+this+") Environment:"+_environment);
+                }
+                _environment = new Environment(environmentPath, envConfig);
+                System.err.println("Created("+this+") Environment:"+_environment);
+                new Exception().printStackTrace();
+                
+                return true;
+            }
+            else
+            {
+                throw de;
+            }
+        }
     }
 
-    private void openDatabases() throws DatabaseException
+    private void openDatabases(boolean readonly) throws DatabaseException
     {
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setTransactional(true);
         dbConfig.setAllowCreate(true);
 
+        //This is required if we are wanting read only access.
+        dbConfig.setReadOnly(readonly);
+
         _messageMetaDataDb = _environment.openDatabase(null, MESSAGEMETADATADB_NAME, dbConfig);
         _queueDb = _environment.openDatabase(null, QUEUEDB_NAME, dbConfig);
         _exchangeDb = _environment.openDatabase(null, EXCHANGEDB_NAME, dbConfig);
@@ -400,6 +456,7 @@
             // get a chance to finish.
             _environment.cleanLog();
             _environment.close();
+            System.err.println("Close("+this+") Environment:"+_environment);
         }
     }
 
@@ -938,9 +995,9 @@
         try
         {
 
-        	Cursor cursor = _deliveryDb.openCursor(tx, null);
-        	DatabaseEntry data = new DatabaseEntry();
-        	OperationStatus status = cursor.getSearchKey(key, data, null);
+            Cursor cursor = _deliveryDb.openCursor(tx, null);
+            DatabaseEntry data = new DatabaseEntry();
+            OperationStatus status = cursor.getSearchKey(key, data, null);
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
@@ -1184,6 +1241,11 @@
 
             throw new AMQException("Error recovering persistent state: " + e, e);
         }
+        catch (AMQException amqe)
+        {
+            abortTran(context);
+            throw new AMQException("Error recovering persistent state: " + amqe, amqe);
+        }
         catch (Throwable ioobe)
         {
             abortTran(context);
@@ -1455,12 +1517,6 @@
         return _queueBindingsDb;
     }
 
-    /** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
-    interface DatabaseVisitor
-    {
-        public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException;
-    }
-
     void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
     {
         visitDatabase(_messageMetaDataDb, visitor);
@@ -1624,9 +1680,14 @@
         }
         catch (DatabaseException e)
         {
-            _log.error("Error: " + e, e);
+            _log.error("Database Error: " + e, e);
             throw e;
         }
+        catch (AMQException e)
+        {
+            _log.error("Store Error: " + e, e);
+            throw e;
+        }
         finally
         {
             if (cursor != null)

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import org.apache.qpid.AMQException;
+
+/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
+public abstract class DatabaseVisitor
+{
+    protected int _count;
+
+    abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException;
+
+    public int getVisitedCount()
+    {
+        return _count;
+    }
+
+    public void resetVisitCount()
+    {
+        _count = 0;
+    }
+}

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-10-02 10:24:57 UTC (rev 2571)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -90,8 +90,6 @@
         _virtualHost = new VirtualHost("test", _store);
         _store.setVirtualHost(_virtualHost);
 
-        _store.recover();
-
         _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2008-10-02 10:24:57 UTC (rev 2571)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -102,11 +102,12 @@
 
         try
         {
-            new BDBStoreUpgrade(_toDir, _toDirTwice).upgradeFromVersion(1);
+            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(1);
             fail("Second Upgrade Succeeded");
         }
         catch (Exception e)
         {
+            e.printStackTrace();
             assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
                        e.getMessage().contains("Unable to load BDBStore as version 1. Store on disk contains version 2 data"));
         }
@@ -168,7 +169,7 @@
 
     private void upgradeBroker() throws Exception
     {
-        new BDBStoreUpgrade(_fromDir, _toDir).upgradeFromVersion(1);
+        new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(1);
     }
 
     private void stopBroker(int port)

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2008-10-02 10:24:57 UTC (rev 2571)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2008-10-02 10:43:31 UTC (rev 2572)
@@ -27,8 +27,21 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.util.FileUtils;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
 
 import java.io.File;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.util.LinkedList;
+import java.util.List;
 
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
@@ -53,14 +66,13 @@
  */
 public class BDBStoreUpgrade
 {
-    private static final String FROM_DIR = "--input";
-    private static final String TO_DIR = "--output";
-
     private static final Logger _logger = LoggerFactory.getLogger(BDBStoreUpgrade.class);
     /** The Store Directory that needs upgrading */
     File _fromDir;
     /** The Directory that will be made to contain the upgraded store */
     File _toDir;
+    /** The Directory that will be made to backup the original store if required */
+    File _backupDir;
 
     /** The Old Store */
     BDBMessageStore _oldMessageStore;
@@ -70,9 +82,31 @@
     VirtualHost _newVirtualHost;
     /** A VHost used in the creation of the queues from the old store */
     VirtualHost _oldVirtualHost;
+    /** The file ending that is used by BDB Store Files */
+    private static final String BDB_FILE_ENDING = ".jdb";
 
-    public BDBStoreUpgrade(String fromDir, String toDir)
+    static final Options _options = new Options();
+    static CommandLine _commandLine;
+    private boolean _interactive;
+    private boolean _force;
+
+    private static final String VERSION = "1.0";
+    private static final String OPTION_INPUT_SHORT = "i";
+    private static final String OPTION_INPUT = "input";
+    private static final String OPTION_OUTPUT_SHORT = "o";
+    private static final String OPTION_OUTPUT = "output";
+    private static final String OPTION_BACKUP_SHORT = "b";
+    private static final String OPTION_BACKUP = "backup";
+    private static final String OPTION_QUIET_SHORT = "q";
+    private static final String OPTION_QUIET = "quiet";
+    private static final String OPTION_FORCE_SHORT = "f";
+    private static final String OPTION_FORCE = "force";
+
+    public BDBStoreUpgrade(String fromDir, String toDir, String backupDir, boolean interactive, boolean force)
     {
+        _interactive = interactive;
+        _force = force;
+
         _fromDir = new File(fromDir);
         if (!_fromDir.exists())
         {
@@ -80,21 +114,130 @@
                                                + "Ensure the path is correct and that the permissions are correct.");
         }
 
+        if (!isDirectoryAStoreDir(_fromDir))
+        {
+            throw new IllegalArgumentException("Specified directory '" + fromDir + "' does not contain a valid BDBMessageStore.");
+        }
+
         _toDir = new File(toDir);
-        if (!_toDir.exists())
+        if (_toDir.exists())
         {
-            if (!_toDir.mkdirs())
+            if (interactive)
             {
-                throw new IllegalArgumentException("Environment path '" + toDir + "' could not be created. "
-                                                   + "Ensure the path is correct and that the permissions are correct.");
+                System.out.println("Upgrade destination: '" + toDir + "'");
+
+                if (userInteract("Upgrade destination exists do you wish to replace it?"))
+                {
+                    try
+                    {
+                        FileUtils.delete(_toDir, true);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IllegalArgumentException("Unable to remove upgrade destination '" + toDir + "':"
+                                                           + e.getMessage());
+                    }
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Upgrade destination '" + toDir + "' already exists. ");
+                }
             }
+            else
+            {
+                if (force)
+                {
+                    try
+                    {
+                        FileUtils.delete(_toDir, true);
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IllegalArgumentException("Unable to remove upgrade destination '" + toDir + "':"
+                                                           + e.getMessage());
+                    }
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Upgrade destination '" + toDir + "' already exists. ");
+                }
+            }
         }
+
+        if (!_toDir.mkdirs())
+        {
+            throw new IllegalArgumentException("Upgrade destination '" + toDir + "' could not be created. "
+                                               + "Ensure the path is correct and that the permissions are correct.");
+        }
+
+        if (backupDir != null)
+        {
+            if (backupDir.equals(""))
+            {
+                _backupDir = new File(_fromDir.getAbsolutePath().toString() + "-Backup");
+            }
+            else
+            {
+                _backupDir = new File(backupDir);
+            }
+        }
         else
         {
-            throw new IllegalArgumentException("Environment path '" + toDir + "' already exists. ");
+            _backupDir = null;
+        }
+    }
 
+    private static String ANSWER_OPTIONS = " Yes/No/Abort? ";
+    private static String ANSWER_NO = "no";
+    private static String ANSWER_N = "n";
+    private static String ANSWER_YES = "yes";
+    private static String ANSWER_Y = "y";
+    private static String ANSWER_ABORT = "abort";
+    private static String ANSWER_A = "a";
+
+    /**
+     * Interact with the user via System.in and System.out. If the user wishes to Abort then a RuntimeException is thrown.
+     * Otherwise the method will return based on their response true=yes false=no.
+     *
+     * @param message Message to print out
+     *
+     * @return boolean response from user if they wish to proceed
+     */
+    private boolean userInteract(String message)
+    {
+        System.out.print(message + ANSWER_OPTIONS);
+        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+        String input = "";
+        try
+        {
+            input = br.readLine();
         }
+        catch (IOException e)
+        {
+            input = "";
+        }
 
+        if (input.equalsIgnoreCase(ANSWER_Y) || input.equalsIgnoreCase(ANSWER_YES))
+        {
+            return true;
+        }
+        else
+        {
+            if (input.equalsIgnoreCase(ANSWER_N) || input.equalsIgnoreCase(ANSWER_NO))
+            {
+                return false;
+            }
+            else
+            {
+                if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
+                {
+                    throw new RuntimeException("User aborted process");
+                }
+            }
+        }
+
+        return userInteract(message);
     }
 
     /**
@@ -106,7 +249,7 @@
      */
     public void upgradeFromVersion(int version) throws Exception
     {
-        upgradeFromVersion(version, _fromDir, _toDir);
+        upgradeFromVersion(version, _fromDir, _toDir, _backupDir, _force);
     }
 
     /**
@@ -120,43 +263,87 @@
      * @throws DatabaseException if there is a problem with the store formats
      * @throws AMQException      if there is an issue creating Qpid data structures
      */
-    public void upgradeFromVersion(int version, File fromDir, File toDir) throws Exception
+    public void upgradeFromVersion(int version, File fromDir, File toDir, File backupDir, boolean force) throws Exception
     {
+        _logger.info("Located store to upgrade at '" + fromDir);
 
+        if (_interactive)
+        {
+            if (!userInteract("Have you performed a DB backup of this store."))
+            {
+                File backup;
+                if (backupDir == null)
+                {
+                    backup = new File(fromDir.getAbsolutePath().toString() + "-Backup");
+                }
+                else
+                {
+                    backup = backupDir;
+                }
+
+                if (userInteract("Do you wish to perform a DB backup now? " +
+                                 "(Store will be backed up to '" + backup.getName() + "')"))
+                {
+                    performDBBackup(fromDir, backupDir, force);
+                }
+                else
+                {
+                    if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
+                                      "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
+                    {
+                        throw new IllegalArgumentException("Upgrade stopped as user request as no DB Backup performed.");
+                    }
+                }
+            }
+            else
+            {
+                _logger.info("Upgrade will create a new store at '" + toDir + "'");
+                _logger.info("Using the contents in the Message Store '" + fromDir + "'");
+
+                if (!userInteract("Do you wish to proceed?"))
+                {
+                    throw new IllegalArgumentException("Upgrade stopped as did not wish to proceed");
+                }
+            }
+        }
+        else
+        {
+            if (backupDir != null)
+            {
+                performDBBackup(fromDir, backupDir, force);
+            }
+        }
+
         // Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
         _newVirtualHost = new VirtualHost("Upgraded", new MemoryMessageStore());
         _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
 
         //Create a new messageStore
         _newMessageStore = new BDBMessageStore();
-        _newMessageStore.configure(_newVirtualHost, toDir);
+        _newMessageStore.configure(_newVirtualHost, toDir, false);
 
-        AMQException openException=null;
-        try{
+        try
+        {
             //Load the old MessageStore
             switch (version)
             {
                 default:
                 case 1:
                     _oldMessageStore = new BDBMessageStore(1);
-                    _oldMessageStore.configure(_oldVirtualHost, fromDir);
+                    _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+                    _oldMessageStore.start();
                     upgradeFromVersion_1();
                     break;
             }
         }
-        catch(AMQException amq)
-        {
-            
-        }
         finally
         {
             _newVirtualHost.close();
             _oldVirtualHost.close();
 
-            _newMessageStore.close();           
+            _newMessageStore.close();
             _oldMessageStore.close();
 
-
             //Shutdown the AR that the Vhosts will have created.
             ApplicationRegistry.remove(1);
         }
@@ -164,51 +351,68 @@
 
     private void upgradeFromVersion_1() throws AMQException, DatabaseException
     {
-        _logger.info("Migrating Message Metadata");
+
+        _logger.info("Starting store upgrade from version 1");
+
+        _logger.info("Message Metadata");
         //Migrate _messageMetaDataDb;
-        moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb());
+        moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(), "Message MetaData");
 
-        _logger.info("Migrating Message Contents");
+        _logger.info("Message Contents");
         //Migrate _messageContentDb;
-        moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb());
+        moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(), "Message Content");
 
-        _logger.info("Migrating Queues");
+        _logger.info("Queues");
         //Migrate _queueDb;
         //Get the oldMessageStore Tuple Binding which does the parsing
         final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
-        _oldMessageStore.visitQueues(new BDBMessageStore.DatabaseVisitor()
+
+        //Create a visitor that will take the queues in the oldMessageStore and add them to the newMessageStore
+        DatabaseVisitor queueVisitor = new DatabaseVisitor()
         {
             public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException
             {
                 AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
 
+                //The simple call to createQueue with the AMQQueue object is sufficient for a v1 upgrade as all the
+                // extra properties in v2 will be defaulted.
                 _newMessageStore.createQueue(queue);
+
+                // We need to call queue stop here as all the queues were already registerd when the _oldMessageStore
+                // state was recovered. Now we are creating a second Queue it will aquire the Executor Service again!
+                // But the queueRegistry is a set so only one release will be performed.
                 //
-                _newMessageStore.getQueueRegistry().registerQueue(queue);
-                //or
-                //queue.stop();
-                // As the queue will create a thread pool message delivery that needs to be shutdown
-                // for the broker to cleanly stop.
+                // queue.stop();
+                //
+                // An alternative approach was taken here: If we don't recover the store
+                _count++;
             }
-        });
+        };
+        //Perform the visit
+        _oldMessageStore.visitQueues(queueVisitor);
 
-        _logger.info("Migrating Delivery Records");
+        logCount(queueVisitor.getVisitedCount(), "queue");
+
+        _logger.info("Delivery Records");
         //Migrate _deliveryDb;
-        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb());
+        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
 
-        _logger.info("Migrating Exchanges");
+        _logger.info("Exchanges");
         //Migrate _exchangeDb;
-        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb());
+        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
 
-        _logger.info("Migrating QueueBindings");
+        _logger.info("QueueBindings");
         //Migrate _queueBindingsDb;
         final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
-        _oldMessageStore.visitBindings(new BDBMessageStore.DatabaseVisitor()
+
+        //Create a visitor that to read the old format queue bindings
+        DatabaseVisitor queueBindings = new DatabaseVisitor()
         {
-            public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException, DatabaseException
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
             {
                 BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
 
+                //Create a new Format TupleBinding
                 TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
 
                 DatabaseEntry newKey = new DatabaseEntry();
@@ -216,78 +420,306 @@
 
                 ByteBinding.byteToEntry((byte) 0, value);
                 _newMessageStore.getBindingsDb().put(null, newKey, value);
+
+                _count++;
             }
-        });
+        };
 
+        _oldMessageStore.visitBindings(queueBindings);
+        logCount(queueBindings.getVisitedCount(), "queue binding");
     }
 
-    private void moveContents(Database oldDatabase, final Database newDatabase) throws AMQException, DatabaseException
+    /**
+     * Log the specified count for item in a user friendly way.
+     *
+     * @param count of items to log
+     * @param item  description of what is being logged.
+     */
+    private void logCount(int count, String item)
     {
-        _oldMessageStore.visitDatabase(oldDatabase, new BDBMessageStore.DatabaseVisitor()
+        _logger.info(" " + count + " " + item + " " + (count == 1 ? "entry" : "entries"));
+    }
+
+    /**
+     * @param oldDatabase The old MessageStoreDB to perform the visit on
+     * @param newDatabase The new MessageStoreDB to copy the data to.
+     * @param contentName The string name of the content for display purposes.
+     *
+     * @throws AMQException      Due to createQueue thorwing AMQException
+     * @throws DatabaseException If there is a problem with the loading of the data
+     */
+    private void moveContents(Database oldDatabase, final Database newDatabase, String contentName) throws AMQException, DatabaseException
+    {
+
+        DatabaseVisitor moveVisitor = new DatabaseVisitor()
         {
-            public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException, DatabaseException
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
             {
+                _count++;
                 newDatabase.put(null, key, value);
             }
-        });
+        };
 
+        _oldMessageStore.visitDatabase(oldDatabase, moveVisitor);
+
+        logCount(moveVisitor.getVisitedCount(), contentName);
     }
 
     private static void usage()
     {
-        System.out.println("usage: " + FROM_DIR + " <path-to-store-to-upgrade> " + TO_DIR + " <path-to-location-to-write-upgraded-db>");
+        System.out.println("usage: BDBStoreUpgrade:\n [-h|--help] [-q|--quiet] [-f|--force] [-b|--backup <Path to backup-db>] " +
+                           "-o|--input <Path to input-db> -o|--output <Path to upgraded-db>");
     }
 
-    private static String getArg(String[] args, String arg)
+    private static void help()
     {
-        if (args[0].equals(arg))
+        System.out.println("usage: BDBStoreUpgrade:");
+        System.out.println("Required:");
+        for (Object obj : _options.getOptions())
         {
-            return args[1];
+            Option option = (Option) obj;
+            if (option.isRequired())
+            {
+                System.out.println("-" + option.getOpt() + "|--" + option.getLongOpt() + "\t\t-\t" + option.getDescription());
+            }
         }
-        else if (args[2].equals(arg))
+
+        System.out.println("\nOptions:");
+        for (Object obj : _options.getOptions())
         {
-            return args[3];
+            Option option = (Option) obj;
+            if (!option.isRequired())
+            {
+                System.out.println("--" + option.getLongOpt() + "|-" + option.getOpt() + "\t\t-\t" + option.getDescription());
+            }
         }
-        else
+    }
+
+    static boolean isDirectoryAStoreDir(File directory)
+    {
+        if (directory.isFile())
         {
-            return null;
+            return false;
         }
+
+        for (File file : directory.listFiles())
+        {
+            if (file.isFile())
+            {
+                if (file.getName().endsWith(BDB_FILE_ENDING))
+                {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
-    public static void main(String[] args)
+    static File[] discoverDBStores(File fromDir)
     {
-        String fromDir, toDir;
+        if (!fromDir.exists())
+        {
+            throw new IllegalArgumentException("'" + fromDir + "' does not exist unable to upgrade.");
+        }
 
-        if (args.length < 4 ||
-            (!args[0].equals(FROM_DIR) && !args[2].equals(TO_DIR)))
+        // Ensure we are given a directory
+        if (fromDir.isFile())
         {
-            usage();
-            System.exit(1);
+            throw new IllegalArgumentException("'" + fromDir + "' is not a directory unable to upgrade.");
         }
 
-        fromDir = getArg(args, FROM_DIR);
+        // Check to see if we have been given a single directory
+        if (isDirectoryAStoreDir(fromDir))
+        {
+            return new File[]{fromDir};
+        }
 
-        toDir = getArg(args, TO_DIR);
+        // Check to see if we have been give a directory containing stores.
+        List<File> stores = new LinkedList<File>();
 
-        if (fromDir == null || toDir == null)
+        for (File directory : fromDir.listFiles())
         {
-            usage();
-            System.exit(1);
+            if (directory.isDirectory())
+            {
+                if (isDirectoryAStoreDir(directory))
+                {
+                    stores.add(directory);
+                }
+            }
         }
 
+        return stores.toArray(new File[stores.size()]);
+    }
+
+    private static void performDBBackup(File source, File backup, boolean force) throws Exception
+    {
+        if (backup.exists())
+        {
+            if (force)
+            {
+                _logger.info("Backup location exists. Forced to remove.");
+                FileUtils.delete(backup, true);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Unable to perform backup a backup already exists.");
+            }
+        }
+
         try
         {
-            new BDBStoreUpgrade(fromDir, toDir).upgradeFromVersion(1);
+            _logger.info("Backing up '" + source + "' to '" + backup + "'");
+            FileUtils.copyRecursive(source, backup);
+        }
+        catch (FileNotFoundException e)
+        {
+            //Throwing IAE here as this will be reported as a Backup not started
+            throw new IllegalArgumentException("Unable to perform backup:" + e.getMessage());
+        }
+        catch (FileUtils.UnableToCopyException e)
+        {
+            //Throwing exception here as this will be reported as a Failed Backup
+            throw new Exception("Unable to perform backup due to:" + e.getMessage());
+        }
+    }
+
+    public static void main(String[] args) throws ParseException
+    {
+        setOptions(_options);
+
+        final Options helpOptions = new Options();
+        setHelpOptions(helpOptions);
+
+        //Display help
+        boolean displayHelp = false;
+        try
+        {
+            if (new PosixParser().parse(helpOptions, args).hasOption("h"))
+            {
+                showHelp();
+            }
+        }
+        catch (ParseException pe)
+        {
+            displayHelp = true;
+        }
+
+        //Parse commandline for required arguments
+        try
+        {
+            _commandLine = new PosixParser().parse(_options, args);
+        }
+        catch (ParseException mae)
+        {
+            if (displayHelp)
+            {
+                showHelp();
+            }
+            else
+            {
+                fatalError(mae.getMessage());
+            }
+        }
+
+        String fromDir = _commandLine.getOptionValue(OPTION_INPUT_SHORT);
+        String toDir = _commandLine.getOptionValue(OPTION_OUTPUT_SHORT);
+        String backupDir = _commandLine.getOptionValue(OPTION_BACKUP_SHORT);
+
+        if (backupDir == null && _commandLine.hasOption(OPTION_BACKUP_SHORT))
+        {
+            backupDir = "";
+        }
+
+        //Attempt to locate possible Store to upgrade on input path
+        File[] stores = new File[0];
+        try
+        {
+            stores = discoverDBStores(new File(fromDir));
+        }
+        catch (IllegalArgumentException iae)
+        {
+            fatalError(iae.getMessage());
+        }
+
+        boolean interactive = !_commandLine.hasOption(OPTION_QUIET_SHORT);
+        boolean force = _commandLine.hasOption(OPTION_FORCE_SHORT);
+
+        for (File store : stores)
+        {
+            // Check to see if we are upgrading a store specified in fromDir or if the directories are nested.
+            if (stores.length > 0 && stores[0].toString().length() == fromDir.length())
+            {
+                upgrade(store, toDir, backupDir, interactive, force);
+            }
+            else
+            {
+                // Add the extra part of path from store to the toDir
+                upgrade(store, toDir + File.separator + store.toString().substring(fromDir.length()), backupDir, interactive, force);
+            }
+        }
+
+    }
+
+    private static void setOptions(Options options)
+    {
+        Option input =
+                OptionBuilder.isRequired().hasArg().withDescription("Location (Path) of store to upgrade.").withLongOpt(OPTION_INPUT)
+                        .create(OPTION_INPUT_SHORT);
+
+        Option output =
+                OptionBuilder.isRequired().hasArg().withDescription("Location (Path) for the upgraded-db to be written.").withLongOpt(OPTION_OUTPUT)
+                        .create(OPTION_OUTPUT_SHORT);
+
+        Option quiet = new Option(OPTION_QUIET_SHORT, OPTION_QUIET, false, "Disable interactive options.");
+
+        Option force = new Option(OPTION_FORCE_SHORT, OPTION_FORCE, false, "Force upgrade removing any existing upgrade target.");
+        Option backup =
+                OptionBuilder.hasOptionalArg().withDescription("Location (Path) for the backup-db to be written.").withLongOpt(OPTION_BACKUP)
+                        .create(OPTION_BACKUP_SHORT);
+
+        options.addOption(input);
+        options.addOption(output);
+        options.addOption(quiet);
+        options.addOption(force);
+        options.addOption(backup);
+        setHelpOptions(options);
+    }
+
+    private static void setHelpOptions(Options options)
+    {
+        options.addOption(new Option("h", "help", false, "Show this help."));
+    }
+
+    static void upgrade(File fromDir, String toDir, String backupDir, boolean interactive, boolean force)
+    {
+
+        _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
+        try
+        {
+            new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(1);
+
             _logger.info("Upgrade complete.");
         }
         catch (IllegalArgumentException iae)
         {
-            _logger.error("Upgrade Not Started: " + iae.getMessage());
+            _logger.error("Upgrade not started due to: " + iae.getMessage());
         }
         catch (Exception e)
         {
-            _logger.error("Upgrade failed: " + e.getMessage());
+            _logger.error("Upgrade Failed: " + e.getMessage());
         }
+    }
 
+    private static void fatalError(String message)
+    {
+        System.out.println(message);
+        usage();
+        System.exit(1);
     }
+
+    private static void showHelp()
+    {
+        help();
+        System.exit(0);
+    }
 }




More information about the rhmessaging-commits mailing list