[rhmessaging-commits] rhmessaging commits: r2416 - in store/branches/java/broker-queue-refactor/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Sep 5 10:03:23 EDT 2008


Author: ritchiem
Date: 2008-09-05 10:03:23 -0400 (Fri, 05 Sep 2008)
New Revision: 2416

Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Log:
RHM-6 : Version all the tables and ensure that the upgrade script cannont be run on an upgraded store. Includes Unit test

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-09-04 16:32:40 UTC (rev 2415)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-09-05 14:03:23 UTC (rev 2416)
@@ -80,12 +80,11 @@
     private static final int DATABASE_FORMAT_VERSION = 2;
     private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
 
-
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
 
     private Environment _environment;
 
-    private static final String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
+    private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
 
     /**
      * Maps from messageId to an AMQMessage (note we don't use serialisation but this is what it roughly corresponds
@@ -93,30 +92,24 @@
      */
     private Database _messageMetaDataDb;
 
-    private static final String MESSAGECONTENTDB_NAME = "messageContentDb";
+    private String MESSAGECONTENTDB_NAME = "messageContentDb";
 
     private Database _messageContentDb;
 
-    private static final String QUEUEDB_NAME = "queueDb";
+    private String QUEUEDB_NAME = "queueDb";
 
-    private static final String NEW_QUEUE_DB_NAME = "QUEUE";
-
     /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
     private Database _queueDb;
 
-    private static final String DELIVERYDB_NAME = "deliveryDb";
+    private String DELIVERYDB_NAME = "deliveryDb";
 
-    private static final String QUEUE_ENTRY_DB_NAME = "QUEUE_ENTRY";
-
     /** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
     private Database _deliveryDb;
 
-    private static final String EXCHANGEDB_NAME = "exchangeDb";
+    private String EXCHANGEDB_NAME = "exchangeDb";
     private Database _exchangeDb;
 
-    private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
-
-    private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+    private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
     private Database _queueBindingsDb;
 
     private VirtualHost _virtualHost;
@@ -125,8 +118,6 @@
 
     private final AtomicLong _queueId = new AtomicLong(1);
 
-    private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
-
     private final CommitThread _commitThread = new CommitThread("Commit-Thread");
 
     private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
@@ -162,6 +153,24 @@
         _version = version;
     }
 
+    private void setDatabaseNames(int version)
+    {
+        if (version > 1)
+        {
+            MESSAGEMETADATADB_NAME += "_v" + version;
+
+            MESSAGECONTENTDB_NAME += "_v" + version;
+
+            QUEUEDB_NAME += "_v" + version;
+
+            DELIVERYDB_NAME += "_v" + version;
+
+            EXCHANGEDB_NAME += "_v" + version;
+
+            QUEUEBINDINGSDB_NAME += "_v" + version;
+        }
+    }
+
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.
@@ -176,6 +185,7 @@
      */
     public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
     {
+        boolean verifyVersion = false;
 
         File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
         if (!environmentPath.exists())
@@ -187,9 +197,8 @@
             }
         }
 
-        _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, 2);
+        _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
 
-
         configure(virtualHost, environmentPath);
     }
 
@@ -204,6 +213,8 @@
 
         _log.info("Configuring BDB message store");
 
+        setDatabaseNames(_version);
+
         if (virtualHost != null)
         {
             setVirtualHost(virtualHost);
@@ -227,11 +238,43 @@
 
         createEnvironment(storePath);
 
+        verifyVersionByTables();
+
         openDatabases();
 
         _commitThread.start();
     }
 
+    private void verifyVersionByTables() throws DatabaseException
+    {
+        for (String s : _environment.getDatabaseNames())
+        {
+            int versionIndex = s.indexOf("_v");
+
+            // DB is v1 if _version is not v1 then error
+            if (versionIndex == -1)
+            {
+                if (_version != 1)
+                {
+                    throw new DatabaseException("Error: Attempting to start BDBStore version " + _version
+                                                + " with a version 1 store. Please Upgrade");
+                }
+                else // DB is v1 and _version is v1
+                {
+                    continue;
+                }
+            }
+            // Otherwise Check Versions
+
+            int version = Integer.parseInt(s.substring(versionIndex + 2));
+
+            if (version != _version)
+            {
+                throw new DatabaseException("Error: Attempting to start BDBStore version " + _version + " with a version " + version + " store. Please Upgrade");
+            }
+        }
+    }
+
     protected void startStore() throws AMQException
     {
         stateTransition(State.CONFIGURED, State.STARTED);

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2008-09-04 16:32:40 UTC (rev 2415)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2008-09-05 14:03:23 UTC (rev 2416)
@@ -52,10 +52,10 @@
 
     private String _topic = "MyDurableSubscriptionTestTopic";
 
-    String _fromDir = System.getProperty("QPID_WORK")+"/version1Store";
-    String _toDir = System.getProperty("QPID_WORK")+"/version2Store";
+    String _fromDir = System.getProperty("QPID_WORK") + "/version1Store";
+    String _toDir = System.getProperty("QPID_WORK") + "/version2Store";
+    String _toDirTwice = System.getProperty("QPID_WORK") + "/version2StoreUpgradeTwice";
 
-
     public void setUp() throws IOException
     {
         assertNotNull("QPID_WORK must be set", System.getProperty("QPID_WORK"));
@@ -65,8 +65,15 @@
         if (directory.exists() && directory.isDirectory())
         {
             FileUtils.delete(directory, true);
-        }        
+        }
 
+        directory = new File(_toDirTwice);
+
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+
         directory = new File(_fromDir);
 
         if (directory.exists() && directory.isDirectory())
@@ -74,9 +81,37 @@
             FileUtils.delete(directory, true);
         }
 
+
     }
 
+    public void testMultipleUpgrades() throws Exception
+    {
+        String broker = "vm://:1";
 
+        startBroker(1, VERSION_1);
+
+        //Ensure msg were transitioned to new broker
+        sendAndCheckDurableSubscriber(broker, true, true, 5, null);
+
+        //Reset the Selector Pattern
+        new DurableSubscriber(broker, _topic, "odd=true").close();
+
+        stopBroker(1);
+
+        upgradeBroker();
+
+        try
+        {
+            new BDBStoreUpgrade(_toDir, _toDirTwice).upgradeFromVersion(1);
+            fail("Second Upgrade Succeeded");
+        }
+        catch (Exception e)
+        {
+            assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+                       e.getMessage().contains("Attempting to start BDBStore version 1 with a version 2 store"));
+        }
+    }
+
     public void testDurababilitySelectors() throws Exception
     {
         String broker = "vm://:1";
@@ -96,7 +131,7 @@
         upgradeBroker();
 
         broker = "vm://:2";
-        
+
         startBroker(2, VERSION_2);
 
         //Ensure msg were transitioned to new broker
@@ -111,7 +146,7 @@
 
         //Ensure that the selector was preseved on restart and caused all msgs to be removed.
         sendAndCheckDurableSubscriber(broker, false, false, 0, null);
-        stopBroker(2);           
+        stopBroker(2);
     }
 
     public void testDurabability() throws Exception




More information about the rhmessaging-commits mailing list