Author: rgemmell
Date: 2011-09-12 12:34:56 -0400 (Mon, 12 Sep 2011)
New Revision: 4480
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Log:
Improve the BDB upgrade tool to handle persisted messages previously sent to non durable
queues (which are no longer stored, and discarded upon startup)
Applied patch by Oleksandr Rudyy and myself.
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-09-10
22:07:16 UTC (rev 4479)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-09-12
16:34:56 UTC (rev 4480)
@@ -35,6 +35,7 @@
import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
@@ -57,8 +58,11 @@
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map.Entry;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
@@ -439,7 +443,7 @@
//Inspect the bindings to gather a list of queues which are probably durable
subscriptions, i.e. those
//which have a colon in their name and are bound to the Topic exchanges above
final List<AMQShortString> durableSubQueues = new
ArrayList<AMQShortString>();
- final TupleBinding bindingTB =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
+ final TupleBinding<BindingKey> bindingTB =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
{
@@ -461,8 +465,11 @@
//Migrate _queueDb;
_logger.info("Queues");
- final TupleBinding queueTupleBinding =
_oldMessageStore.getQueueTupleBindingFactory().getInstance();
+ // hold the list of existing queue names
+ final List<AMQShortString> existingQueues = new
ArrayList<AMQShortString>();
+ final TupleBinding<QueueRecord> queueTupleBinding =
_oldMessageStore.getQueueTupleBindingFactory().getInstance();
+
DatabaseVisitor queueVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws
AMQStoreException
@@ -482,6 +489,7 @@
_newMessageStore.createQueue(queueRec);
_count++;
+ existingQueues.add(queueName);
}
};
_oldMessageStore.visitQueues(queueVisitor);
@@ -489,25 +497,112 @@
logCount(queueVisitor.getVisitedCount(), "Queue");
+ // Look for persistent messages stored for non-durable queues
+ _logger.info("Checking for messages previously sent to non-durable
queues");
+
+ // track all message delivery to existing queues
+ final HashSet<Long> queueMessages = new HashSet<Long>();
+
+ // hold all non existing queues and their messages IDs
+ final HashMap<String, HashSet<Long>> phantomMessageQueues = new
HashMap<String, HashSet<Long>>();
+
+ // delivery DB visitor to check message delivery and identify non existing
queues
+ final QueueEntryTB queueEntryTB = new QueueEntryTB();
+ DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
+ {
+ QueueEntryKey entryKey = (QueueEntryKey)
queueEntryTB.entryToObject(key);
+ Long messageId = entryKey.getMessageId();
+ AMQShortString queueName = entryKey.getQueueName();
+ if (!existingQueues.contains(queueName))
+ {
+ String name = queueName.asString();
+ HashSet<Long> messages = phantomMessageQueues.get(name);
+ if (messages == null)
+ {
+ messages = new HashSet<Long>();
+ phantomMessageQueues.put(name, messages);
+ }
+ messages.add(messageId);
+ _count++;
+ }
+ else
+ {
+ queueMessages.add(messageId);
+ }
+ }
+ };
+ _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
+
+ if (phantomMessageQueues.isEmpty())
+ {
+ _logger.info("No such messages were found");
+ }
+ else
+ {
+ _logger.info("Found " +
messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
+
+ for (Entry<String, HashSet<Long>> phantomQueue :
phantomMessageQueues.entrySet())
+ {
+ String queueName = phantomQueue.getKey();
+ HashSet<Long> messages = phantomQueue.getValue();
+
+ _logger.info(MessageFormat.format("There are {0} messages which were
previously delivered to non-durable queue ''{1}''",messages.size(),
queueName));
+
+ boolean createQueue;
+ if(!_interactive)
+ {
+ createQueue = true;
+ _logger.info("Running in batch-mode, marking queue as durable to
ensure retention of the messages.");
+ }
+ else
+ {
+ createQueue = userInteract("Do you want to make this queue
durable?\n"
+ + "NOTE: Answering No will result in
these messages being discarded!");
+ }
+
+ if (createQueue)
+ {
+ for (Long messageId : messages)
+ {
+ queueMessages.add(messageId);
+ }
+ AMQShortString name = new AMQShortString(queueName);
+ existingQueues.add(name);
+ QueueRecord record = new QueueRecord(name, null, false, null);
+ _newMessageStore.createQueue(record);
+ }
+ }
+ }
+
+
//Migrate _messageMetaDataDb;
_logger.info("Message MetaData");
final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
- final TupleBinding oldMetaDataTupleBinding =
_oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
- final TupleBinding newMetaDataTupleBinding =
_newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+ final TupleBinding<Object> oldMetaDataTupleBinding =
_oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+ final TupleBinding<Object> newMetaDataTupleBinding =
_newMessageStore.getMetaDataTupleBindingFactory().getInstance();
DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
{
+ _count++;
MessageMetaData metaData = (MessageMetaData)
oldMetaDataTupleBinding.entryToObject(value);
+ // get message id
+ Long messageId =
TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
+
+ // ONLY copy data if message is delivered to existing queue
+ if (!queueMessages.contains(messageId))
+ {
+ return;
+ }
DatabaseEntry newValue = new DatabaseEntry();
newMetaDataTupleBinding.objectToEntry(metaData, newValue);
newMetaDataDB.put(null, key, newValue);
-
- _count++;
}
};
_oldMessageStore.visitMetaDataDb(metaDataVisitor);
@@ -519,8 +614,8 @@
_logger.info("Message Contents");
final Database newContentDB = _newMessageStore.getContentDb();
- final TupleBinding oldContentKeyTupleBinding = new MessageContentKeyTB_4();
- final TupleBinding newContentKeyTupleBinding = new MessageContentKeyTB_5();
+ final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new
MessageContentKeyTB_4();
+ final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new
MessageContentKeyTB_5();
final TupleBinding contentTB = new ContentTB();
DatabaseVisitor contentVisitor = new DatabaseVisitor()
@@ -530,10 +625,17 @@
public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
{
+ _count++;
+
//determine the msgId of the current entry
MessageContentKey_4 contentKey = (MessageContentKey_4)
oldContentKeyTupleBinding.entryToObject(key);
long msgId = contentKey.getMessageId();
+ // ONLY copy data if message is delivered to existing queue
+ if (!queueMessages.contains(msgId))
+ {
+ return;
+ }
//if this is a new message, restart the byte offset count.
if(_prevMsgId != msgId)
{
@@ -556,8 +658,6 @@
_prevMsgId = msgId;
_bytesSeenSoFar += contentSize;
-
- _count++;
}
};
_oldMessageStore.visitContentDb(contentVisitor);
@@ -567,7 +667,27 @@
//Migrate _deliveryDb;
_logger.info("Delivery Records");
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(),
"Delivery Record");
+ final Database deliveryDb =_newMessageStore.getDeliveryDb();
+ DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
+ {
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
DatabaseException
+ {
+ _count++;
+
+ // get message id from entry key
+ QueueEntryKey entryKey = (QueueEntryKey)
queueEntryTB.entryToObject(key);
+ AMQShortString queueName = entryKey.getQueueName();
+
+ // ONLY copy data if message queue exists
+ if (existingQueues.contains(queueName))
+ {
+ deliveryDb.put(null, key, value);
+ }
+ }
+ };
+ _oldMessageStore.visitDelivery(deliveryDbVisitor);
+ logCount(contentVisitor.getVisitedCount(), "Delivery Record");
}
/**
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-09-10
22:07:16 UTC (rev 4479)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-09-12
16:34:56 UTC (rev 4480)
@@ -25,6 +25,9 @@
import static
org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -40,13 +43,29 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import
org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import
org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
/**
* Tests upgrading a BDB store and using it with the new broker
* after the required contents are entered into the store using
@@ -264,6 +283,188 @@
consumeQueueMessages(connection, false);
}
+ /**
+ * Tests store migration containing messages for non-existing queue.
+ *
+ * @throws Exception
+ */
+ public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ {
+ stopBroker();
+
+ // copy store data into a new location for adding of phantom message
+ File storeLocation = new File(_fromDir);
+ File target = new File(_toDirTwice);
+ if (!target.exists())
+ {
+ target.mkdirs();
+ }
+ FileUtils.copyRecursive(storeLocation, target);
+
+ // delete migrated data
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ // test data
+ String nonExistingQueueName = getTestQueueName();
+ String messageText = "Test Phantom Message";
+
+ // add message
+ addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName,
messageText);
+
+ String[] inputs = { "Yes", "Yes", "Yes" };
+ upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+ // start broker
+ startBroker();
+
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // consume a message for non-existing store
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(nonExistingQueueName);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(1000);
+
+ // assert consumed message
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof
TextMessage);
+ String text = ((TextMessage) message).getText();
+ assertEquals("Message migration failed!", messageText, text);
+ }
+
+ /**
+ * An utility method to upgrade broker with simulation user interactions
+ *
+ * @param fromDir
+ * location of the store to migrate
+ * @param toDir
+ * location of where migrated data will be stored
+ * @param inputs
+ * user answers on upgrade tool questions
+ * @throws Exception
+ */
+ private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final
String[] inputs)
+ throws Exception
+ {
+ // save to restore system.in after data migration
+ InputStream stdin = System.in;
+
+ // set fake system in to simulate user interactions
+ // FIXME: it is a quite dirty simulator of system input but it does the job
+ System.setIn(new InputStream()
+ {
+
+ int counter = 0;
+
+ public synchronized int read(byte b[], int off, int len)
+ {
+ byte[] src = (inputs[counter] + "\n").getBytes();
+ System.arraycopy(src, 0, b, off, src.length);
+ counter++;
+ return src.length;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return -1;
+ }
+ });
+
+ try
+ {
+ // Upgrade the test store.
+ new BDBStoreUpgrade(fromDir, toDir, null, true,
true).upgradeFromVersion(VERSION_4);
+ }
+ finally
+ {
+ // restore system in
+ System.setIn(stdin);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addMessageForNonExistingQueue(File storeLocation, int storeVersion,
String nonExistingQueueName,
+ String messageText) throws Exception
+ {
+ final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+ BDBMessageStore store = new BDBMessageStore(storeVersion);
+ store.configure(storeLocation, false);
+ try
+ {
+ store.start();
+
+ // store message objects
+ ByteBuffer completeContentBody =
ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+ long bodySize = completeContentBody.limit();
+ MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new
AMQShortString("amq.direct"), false,
+ false, queueName);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/plain");
+ props.setType("text/plain");
+ props.setMessageId("whatever");
+ props.setEncoding("UTF-8");
+ props.getHeaders().setString("Test", "MST");
+ MethodRegistry methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1,
props, bodySize);
+
+ // add content entry to database
+ long messageId = store.getNewMessageId();
+ TupleBinding<MessageContentKey> contentKeyTB = new
MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+ MessageContentKey contentKey = null;
+ if (storeVersion == VERSION_4)
+ {
+ contentKey = new MessageContentKey_4(messageId, 0);
+ }
+ else
+ {
+ throw new Exception(storeVersion + " is not supported");
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ contentKeyTB.objectToEntry(contentKey, key);
+ DatabaseEntry data = new DatabaseEntry();
+ ContentTB contentTB = new ContentTB();
+ contentTB.objectToEntry(completeContentBody, data);
+ store.getContentDb().put(null, key, data);
+
+ // add meta data entry to database
+ TupleBinding<Long> longTB =
TupleBinding.getPrimitiveBinding(Long.class);
+ TupleBinding<Object> metaDataTB = new
MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ longTB.objectToEntry(new Long(messageId), key);
+ MessageMetaData metaData = new MessageMetaData(pubInfoBody,
contentHeaderBody, 1);
+ metaDataTB.objectToEntry(metaData, data);
+ store.getMetaDataDb().put(null, key, data);
+
+ // add delivery entry to database
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return queueName.asString();
+ }
+ };
+ TransactionLog log = (TransactionLog) store;
+ TransactionLog.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, messageId);
+ txn.commitTran();
+ }
+ finally
+ {
+ // close store
+ store.close();
+ }
+ }
+
private void consumeDurableSubscriptionMessages(Connection connection) throws
Exception
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);