[rhmessaging-commits] rhmessaging commits: r2416 - in store/branches/java/broker-queue-refactor/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Sep 5 10:03:23 EDT 2008
Author: ritchiem
Date: 2008-09-05 10:03:23 -0400 (Fri, 05 Sep 2008)
New Revision: 2416
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Log:
RHM-6 : Version all the tables and ensure that the upgrade script cannont be run on an upgraded store. Includes Unit test
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-09-04 16:32:40 UTC (rev 2415)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-09-05 14:03:23 UTC (rev 2416)
@@ -80,12 +80,11 @@
private static final int DATABASE_FORMAT_VERSION = 2;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
-
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
- private static final String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
+ private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
/**
* Maps from messageId to an AMQMessage (note we don't use serialisation but this is what it roughly corresponds
@@ -93,30 +92,24 @@
*/
private Database _messageMetaDataDb;
- private static final String MESSAGECONTENTDB_NAME = "messageContentDb";
+ private String MESSAGECONTENTDB_NAME = "messageContentDb";
private Database _messageContentDb;
- private static final String QUEUEDB_NAME = "queueDb";
+ private String QUEUEDB_NAME = "queueDb";
- private static final String NEW_QUEUE_DB_NAME = "QUEUE";
-
/** Maps from name (which uniquely identifies a queue) to an AMQQueue */
private Database _queueDb;
- private static final String DELIVERYDB_NAME = "deliveryDb";
+ private String DELIVERYDB_NAME = "deliveryDb";
- private static final String QUEUE_ENTRY_DB_NAME = "QUEUE_ENTRY";
-
/** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
private Database _deliveryDb;
- private static final String EXCHANGEDB_NAME = "exchangeDb";
+ private String EXCHANGEDB_NAME = "exchangeDb";
private Database _exchangeDb;
- private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
-
- private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+ private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
private Database _queueBindingsDb;
private VirtualHost _virtualHost;
@@ -125,8 +118,6 @@
private final AtomicLong _queueId = new AtomicLong(1);
- private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
-
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
@@ -162,6 +153,24 @@
_version = version;
}
+ private void setDatabaseNames(int version)
+ {
+ if (version > 1)
+ {
+ MESSAGEMETADATADB_NAME += "_v" + version;
+
+ MESSAGECONTENTDB_NAME += "_v" + version;
+
+ QUEUEDB_NAME += "_v" + version;
+
+ DELIVERYDB_NAME += "_v" + version;
+
+ EXCHANGEDB_NAME += "_v" + version;
+
+ QUEUEBINDINGSDB_NAME += "_v" + version;
+ }
+ }
+
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
@@ -176,6 +185,7 @@
*/
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
+ boolean verifyVersion = false;
File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
if (!environmentPath.exists())
@@ -187,9 +197,8 @@
}
}
- _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, 2);
+ _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
-
configure(virtualHost, environmentPath);
}
@@ -204,6 +213,8 @@
_log.info("Configuring BDB message store");
+ setDatabaseNames(_version);
+
if (virtualHost != null)
{
setVirtualHost(virtualHost);
@@ -227,11 +238,43 @@
createEnvironment(storePath);
+ verifyVersionByTables();
+
openDatabases();
_commitThread.start();
}
+ private void verifyVersionByTables() throws DatabaseException
+ {
+ for (String s : _environment.getDatabaseNames())
+ {
+ int versionIndex = s.indexOf("_v");
+
+ // DB is v1 if _version is not v1 then error
+ if (versionIndex == -1)
+ {
+ if (_version != 1)
+ {
+ throw new DatabaseException("Error: Attempting to start BDBStore version " + _version
+ + " with a version 1 store. Please Upgrade");
+ }
+ else // DB is v1 and _version is v1
+ {
+ continue;
+ }
+ }
+ // Otherwise Check Versions
+
+ int version = Integer.parseInt(s.substring(versionIndex + 2));
+
+ if (version != _version)
+ {
+ throw new DatabaseException("Error: Attempting to start BDBStore version " + _version + " with a version " + version + " store. Please Upgrade");
+ }
+ }
+ }
+
protected void startStore() throws AMQException
{
stateTransition(State.CONFIGURED, State.STARTED);
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2008-09-04 16:32:40 UTC (rev 2415)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2008-09-05 14:03:23 UTC (rev 2416)
@@ -52,10 +52,10 @@
private String _topic = "MyDurableSubscriptionTestTopic";
- String _fromDir = System.getProperty("QPID_WORK")+"/version1Store";
- String _toDir = System.getProperty("QPID_WORK")+"/version2Store";
+ String _fromDir = System.getProperty("QPID_WORK") + "/version1Store";
+ String _toDir = System.getProperty("QPID_WORK") + "/version2Store";
+ String _toDirTwice = System.getProperty("QPID_WORK") + "/version2StoreUpgradeTwice";
-
public void setUp() throws IOException
{
assertNotNull("QPID_WORK must be set", System.getProperty("QPID_WORK"));
@@ -65,8 +65,15 @@
if (directory.exists() && directory.isDirectory())
{
FileUtils.delete(directory, true);
- }
+ }
+ directory = new File(_toDirTwice);
+
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
directory = new File(_fromDir);
if (directory.exists() && directory.isDirectory())
@@ -74,9 +81,37 @@
FileUtils.delete(directory, true);
}
+
}
+ public void testMultipleUpgrades() throws Exception
+ {
+ String broker = "vm://:1";
+ startBroker(1, VERSION_1);
+
+ //Ensure msg were transitioned to new broker
+ sendAndCheckDurableSubscriber(broker, true, true, 5, null);
+
+ //Reset the Selector Pattern
+ new DurableSubscriber(broker, _topic, "odd=true").close();
+
+ stopBroker(1);
+
+ upgradeBroker();
+
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice).upgradeFromVersion(1);
+ fail("Second Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+ e.getMessage().contains("Attempting to start BDBStore version 1 with a version 2 store"));
+ }
+ }
+
public void testDurababilitySelectors() throws Exception
{
String broker = "vm://:1";
@@ -96,7 +131,7 @@
upgradeBroker();
broker = "vm://:2";
-
+
startBroker(2, VERSION_2);
//Ensure msg were transitioned to new broker
@@ -111,7 +146,7 @@
//Ensure that the selector was preseved on restart and caused all msgs to be removed.
sendAndCheckDurableSubscriber(broker, false, false, 0, null);
- stopBroker(2);
+ stopBroker(2);
}
public void testDurabability() throws Exception
More information about the rhmessaging-commits
mailing list