Author: rgemmell
Date: 2010-06-25 05:44:25 -0400 (Fri, 25 Jun 2010)
New Revision: 4050
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial work on v2->v3 BDBMessageStore upgrade tool
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-24
20:48:55 UTC (rev 4049)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-25
09:44:25 UTC (rev 4050)
@@ -1043,14 +1043,28 @@
_log.debug("public void createQueue(AMQQueue queue(" +
queue.getName() + ") = " + queue + "): called");
}
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+ queue.getOwner(), queue.isExclusive(),
arguments);
+
+ createQueue(queueRecord);
+ }
+
+ /**
+ * Makes the specified queue persistent.
+ *
+ * Only intended for direct use during store upgrades.
+ *
+ * @param queueRecord Details of the queue to store.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
+ protected void createQueue(QueueRecord queueRecord) throws AMQException
+ {
if (_state != State.RECOVERING)
{
- QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(),
queue.isExclusive(), arguments);
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getNameShortString(), key);
+ keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
@@ -1062,7 +1076,8 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing AMQQueue with name " +
queue.getName() + " to database: " + e, e);
+ throw new AMQException("Error writing AMQQueue with name " +
+ queueRecord.getNameShortString().toString() + " to database:
" + e, e);
}
}
}
@@ -1477,14 +1492,9 @@
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
public int getContent(Long messageId, int offset, ByteBuffer dst) throws
AMQException
- {
- final int limit = offset + dst.remaining();
-
+ {
DatabaseEntry contentKeyEntry = new DatabaseEntry();
- //TODO: if requested offset is non zero, use partial record (key-only) search to
- //locate the first record key to prevent reading in data we dont need
-
//Start from 0 offset and search for the starting chunk.
MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
@@ -1505,7 +1515,7 @@
{
cursor = _messageContentDb.openCursor(null, null);
- OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.RMW);
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.READ_UNCOMMITTED);
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_3)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
@@ -1524,7 +1534,6 @@
seenSoFar += size;
- //TODO: can change this guard if we start recording the last byte in the
chunk record
if(seenSoFar >= offset)
{
byte[] dataAsBytes = buf.array();
@@ -1600,6 +1609,11 @@
{
return _bindingTupleBindingFactory;
}
+
+ protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
+ {
+ return _metaDataTupleBindingFactory;
+ }
//Package getters for the various databases used by the Store
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-06-24
20:48:55 UTC (rev 4049)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-06-25
09:44:25 UTC (rev 4050)
@@ -52,6 +52,11 @@
{
return _exclusive;
}
+
+ public void setExclusive(boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
public FieldTable getArguments()
{
Modified:
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-24
20:48:55 UTC (rev 4049)
+++
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-06-25
09:44:25 UTC (rev 4050)
@@ -22,15 +22,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_1;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
@@ -39,13 +42,14 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -53,21 +57,15 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
/**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2
Store.
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V2 Store to a V3
Store.
*
- * NOTE: No checks are in place to validate that the input is V1.
+ * Currently upgrade is fixed from v2 -> v3
*
- * Currently upgrade is fixed from v1 -> v2
- * Only the Queue and Binding databases are migrated all other databases are copied as DB
entries.
- *
* Improvments:
* - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against
new DBs being added.
- * - Add a version value into the store so that a quick check can be performed to perform
the upgrades.
* - A version in the store would allow automated upgrade or later with more available
versions interactive upgrade.
- * - Currently only the Queue and Binding DB are processed for upgrade all the other db
data is copied between stores.
* - Add process logging and disable all Store and Qpid logging.
*/
public class BDBStoreUpgrade
@@ -84,10 +82,6 @@
BDBMessageStore _oldMessageStore;
/** The New Store */
BDBMessageStore _newMessageStore;
- /** A VHost used in the migration of the queues from the old VHost */
- VirtualHost _newVirtualHost;
- /** A VHost used in the creation of the queues from the old store */
- VirtualHost _oldVirtualHost;
/** The file ending that is used by BDB Store Files */
private static final String BDB_FILE_ENDING = ".jdb";
@@ -96,7 +90,7 @@
private boolean _interactive;
private boolean _force;
- private static final String VERSION = "1.0";
+ private static final String VERSION = "2.0";
private static final String OPTION_INPUT_SHORT = "i";
private static final String OPTION_INPUT = "input";
private static final String OPTION_OUTPUT_SHORT = "o";
@@ -312,7 +306,7 @@
if (!userInteract("Are you sure wish to proceed with DB
migration without backup? " +
"(For more details of the consequences check
the Qpid/BDB Message Store Wiki)."))
{
- throw new IllegalArgumentException("Upgrade stopped as user
request as no DB Backup performed.");
+ throw new IllegalArgumentException("Upgrade stopped at user
request as no DB Backup performed.");
}
}
}
@@ -341,13 +335,9 @@
CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
- // Note the name of the Vhosts is not important, the store doesnot record the
name of the vhost.
- _newVirtualHost = new VirtualHostImpl(new
VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new
MemoryMessageStore());
- _oldVirtualHost = new VirtualHostImpl(new
VirtualHostConfiguration("Old", new PropertiesConfiguration()), new
MemoryMessageStore());
-
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
- //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
+ _newMessageStore.configure(toDir, false);
try
{
@@ -355,25 +345,19 @@
switch (version)
{
default:
- case 1:
- _oldMessageStore = new BDBMessageStore(1);
- //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ case 2:
+ _oldMessageStore = new BDBMessageStore(2);
+ _oldMessageStore.configure(fromDir, true);
_oldMessageStore.start();
- upgradeFromVersion_1();
+ upgradeFromVersion_2();
break;
}
}
finally
{
- _newVirtualHost.close();
- _oldVirtualHost.close();
-
_newMessageStore.close();
_oldMessageStore.close();
- //Shutdown the AR that the Vhosts will have created.
- ApplicationRegistry.remove(1);
-
// if we are running inplace then swap fromDir and toDir
if (inplace)
{
@@ -395,84 +379,170 @@
}
}
- private void upgradeFromVersion_1() throws AMQException, DatabaseException
+ private void upgradeFromVersion_2() throws AMQException, DatabaseException
{
+ _logger.info("Starting store upgrade from version 2");
+
+ //Migrate _exchangeDb;
+ _logger.info("Exchanges");
- _logger.info("Starting store upgrade from version 1");
+ moveContents(_oldMessageStore.getExchangesDb(),
_newMessageStore.getExchangesDb(), "Exchange");
- _logger.info("Message Metadata");
- //Migrate _messageMetaDataDb;
- moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(),
"Message MetaData");
+ final List<AMQShortString> topicExchanges = new
ArrayList<AMQShortString>();
+ final TupleBinding exchangeTB = new ExchangeTB();
+
+ DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord)
exchangeTB.entryToObject(value);
+ AMQShortString type = exchangeRec.getType();
- _logger.info("Message Contents");
- //Migrate _messageContentDb;
- moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(),
"Message Content");
+ if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
+ {
+ topicExchanges.add(exchangeRec.getNameShortString());
+ }
+ }
+ };
+ _oldMessageStore.visitExchanges(exchangeListVisitor);
+
+ //Migrate _queueBindingsDb;
+ _logger.info("QueueBindings");
+ moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(),
"Queue Binding");
+
+ //Inspect the bindings to gather a list of queues which are probably durable
subscriptions, i.e. those
+ //which have a colon in their name and are bound to the Topic exchanges above
+ final List<AMQShortString> durableSubQueues = new
ArrayList<AMQShortString>();
+ final TupleBinding bindingTB =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
+
+ DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
+ {
+ BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
+ AMQShortString queueName = bindingRec.getQueueName();
+ AMQShortString exchangeName = bindingRec.getExchangeName();
+
+ if (topicExchanges.contains(exchangeName) &&
queueName.asString().contains(":"))
+ {
+ durableSubQueues.add(queueName);
+ }
+ }
+ };
+ _oldMessageStore.visitBindings(durSubQueueListVisitor);
+
+
+ //Migrate _queueDb;
_logger.info("Queues");
- //Migrate _queueDb;
- //Get the oldMessageStore Tuple Binding which does the parsing
+
final TupleBinding queueTupleBinding =
_oldMessageStore.getQueueTupleBindingFactory().getInstance();
- //Create a visitor that will take the queues in the oldMessageStore and add them
to the newMessageStore
DatabaseVisitor queueVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws
AMQException
{
- AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
+ QueueRecord queueRec = (QueueRecord)
queueTupleBinding.entryToObject(value);
+ AMQShortString queueName = queueRec.getNameShortString();
- //The simple call to createQueue with the AMQQueue object is sufficient
for a v1 upgrade as all the
- // extra properties in v2 will be defaulted.
- _newMessageStore.createQueue(queue);
+ //if the queue name is in the gathered list then set its exclusivity
true
+ if (durableSubQueues.contains(queueName))
+ {
+ _logger.info("Marking as possible DurableSubscription backing
queue: " + queueName);
+ queueRec.setExclusive(true);
+ }
+
+ //The simple call to createQueue with the QueueRecord object is
sufficient for a v2->v3 upgrade as
+ //the extra 'exclusive' property in v3 will be defaulted to false
in the record creation.
+ _newMessageStore.createQueue(queueRec);
- // We need to call queue stop here as all the queues were already
registerd when the _oldMessageStore
- // state was recovered. Now we are creating a second Queue it will aquire
the Executor Service again!
- // But the queueRegistry is a set so only one release will be performed.
- //
- // queue.stop();
- //
- // An alternative approach was taken here: If we don't recover the
store
_count++;
}
};
- //Perform the visit
_oldMessageStore.visitQueues(queueVisitor);
logCount(queueVisitor.getVisitedCount(), "queue");
- _logger.info("Delivery Records");
- //Migrate _deliveryDb;
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(),
"Delivery Record");
- _logger.info("Exchanges");
- //Migrate _exchangeDb;
- moveContents(_oldMessageStore.getExchangesDb(),
_newMessageStore.getExchangesDb(), "Exchange");
+ //Migrate _messageMetaDataDb;
+ _logger.info("Message MetaData");
+
+ final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
+ final TupleBinding oldMetaDataTupleBinding =
_oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+ final TupleBinding newMetaDataTupleBinding =
_newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+
+ DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
+ {
+ MessageMetaData metaData = (MessageMetaData)
oldMetaDataTupleBinding.entryToObject(value);
- _logger.info("QueueBindings");
- //Migrate _queueBindingsDb;
- final TupleBinding bindingTupleBinding =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
+ DatabaseEntry newValue = new DatabaseEntry();
+ newMetaDataTupleBinding.objectToEntry(metaData, newValue);
+
+ newMetaDataDB.put(null, key, newValue);
- //Create a visitor that to read the old format queue bindings
- DatabaseVisitor queueBindings = new DatabaseVisitor()
+ _count++;
+ }
+ };
+ _oldMessageStore.visitMetaDataDb(metaDataVisitor);
+
+ logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
+
+
+ //Migrate _messageContentDb;
+ _logger.info("Message Contents");
+ final Database newContentDB = _newMessageStore.getContentDb();
+
+ final TupleBinding oldContentKeyTupleBinding = new MessageContentKeyTB_1();
+ final TupleBinding newContentKeyTupleBinding = new MessageContentKeyTB_3();
+ final TupleBinding contentTB = new ContentTB();
+
+ DatabaseVisitor contentVisitor = new DatabaseVisitor()
{
+ long _prevMsgId = -1; //Initialise to invalid value
+ int _bytesSeenSoFar = 0;
+
public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
{
- BindingKey queueBinding = (BindingKey)
bindingTupleBinding.entryToObject(key);
+ //determine the msgId of the current entry
+ MessageContentKey_1 contentKey = (MessageContentKey_1)
oldContentKeyTupleBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
- //Create a new Format TupleBinding
- TupleBinding newBindingTupleBinding =
_newMessageStore.getBindingTupleBindingFactory().getInstance();
+ //if this is a new message, restart the byte offset count.
+ if(_prevMsgId != msgId)
+ {
+ _bytesSeenSoFar = 0;
+ }
- DatabaseEntry newKey = new DatabaseEntry();
- newBindingTupleBinding.objectToEntry(queueBinding, newKey);
+ //determine the content size
+ ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
+ int contentSize = content.limit();
- ByteBinding.byteToEntry((byte) 0, value);
- _newMessageStore.getBindingsDb().put(null, newKey, value);
+ //create the new key: id + previously seen data count
+ MessageContentKey_3 newKey = new MessageContentKey_3(msgId,
_bytesSeenSoFar);
+ DatabaseEntry newKeyEntry = new DatabaseEntry();
+ newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
+ DatabaseEntry newValueEntry = new DatabaseEntry();
+ contentTB.objectToEntry(content, newValueEntry);
+
+ newContentDB.put(null, newKeyEntry, newValueEntry);
+
+ _prevMsgId = msgId;
+ _bytesSeenSoFar += contentSize;
+
_count++;
}
};
+ _oldMessageStore.visitContentDb(contentVisitor);
- _oldMessageStore.visitBindings(queueBindings);
- logCount(queueBindings.getVisitedCount(), "queue binding");
+ logCount(metaDataVisitor.getVisitedCount(), "Message ContentChunk");
+
+
+ //Migrate _deliveryDb;
+ _logger.info("Delivery Records");
+ moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(),
"Delivery Record");
}
/**
@@ -713,7 +783,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!("User aborted process").equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -762,7 +832,7 @@
_logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
try
{
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive,
force).upgradeFromVersion(1);
+ new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive,
force).upgradeFromVersion(2);
_logger.info("Upgrade complete.");
}
@@ -772,9 +842,9 @@
}
catch (DatabaseException de)
{
- if (de.getMessage().endsWith("Error: Unable to load BDBStore as version
1. Store on disk contains version 2 data."))
+ if (de.getMessage().endsWith("Error: Unable to load BDBStore as version
2. Store on disk contains version 3 data."))
{
- System.out.println("Store '" + fromDir + "' has
already been upgraded to version 2.");
+ System.out.println("Store '" + fromDir + "' has
already been upgraded to version 3.");
}
else
{
@@ -784,7 +854,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!("User aborted process").equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());