[rhmessaging-commits] rhmessaging commits: r4480 - in store/trunk/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Sep 12 12:34:56 EDT 2011


Author: rgemmell
Date: 2011-09-12 12:34:56 -0400 (Mon, 12 Sep 2011)
New Revision: 4480

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Log:
Improve the BDB upgrade tool to handle persisted messages previously sent to non durable queues (which are no longer stored, and discarded upon startup)

Applied patch by Oleksandr Rudyy and myself.

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2011-09-10 22:07:16 UTC (rev 4479)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2011-09-12 16:34:56 UTC (rev 4480)
@@ -35,6 +35,7 @@
 import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
 import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4;
 import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.NullRootMessageLogger;
@@ -57,8 +58,11 @@
 import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
@@ -439,7 +443,7 @@
         //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those 
         //which have a colon in their name and are bound to the Topic exchanges above
         final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
-        final TupleBinding bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
+        final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
         
         DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
         {           
@@ -461,8 +465,11 @@
         //Migrate _queueDb;
         _logger.info("Queues");
 
-        final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
+        // hold the list of existing queue names
+        final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>();
 
+        final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
+
         DatabaseVisitor queueVisitor = new DatabaseVisitor()
         {
             public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
@@ -482,6 +489,7 @@
                 _newMessageStore.createQueue(queueRec);
 
                 _count++;
+                existingQueues.add(queueName);
             }
         };
         _oldMessageStore.visitQueues(queueVisitor);
@@ -489,25 +497,112 @@
         logCount(queueVisitor.getVisitedCount(), "Queue");
 
 
+        // Look for persistent messages stored for non-durable queues
+        _logger.info("Checking for messages previously sent to non-durable queues");
+
+        // track all message delivery to existing queues
+        final HashSet<Long> queueMessages = new HashSet<Long>();
+
+        // hold all non existing queues and their messages IDs
+        final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>();
+
+        // delivery DB visitor to check message delivery and identify non existing queues
+        final QueueEntryTB queueEntryTB = new QueueEntryTB();
+        DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
+        {
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
+                Long messageId = entryKey.getMessageId();
+                AMQShortString queueName = entryKey.getQueueName();
+                if (!existingQueues.contains(queueName))
+                {
+                    String name = queueName.asString();
+                    HashSet<Long> messages = phantomMessageQueues.get(name);
+                    if (messages == null)
+                    {
+                        messages = new HashSet<Long>();
+                        phantomMessageQueues.put(name, messages);
+                    }
+                    messages.add(messageId);
+                    _count++;
+                }
+                else
+                {
+                    queueMessages.add(messageId);
+                }
+            }
+        };
+        _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
+
+        if (phantomMessageQueues.isEmpty())
+        {
+            _logger.info("No such messages were found");
+        }
+        else
+        {
+            _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
+
+            for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet())
+            {
+                String queueName = phantomQueue.getKey();
+                HashSet<Long> messages = phantomQueue.getValue();
+
+                _logger.info(MessageFormat.format("There are {0} messages which were previously delivered to non-durable queue ''{1}''",messages.size(), queueName));
+
+                boolean createQueue;
+                if(!_interactive)
+                {
+                    createQueue = true;
+                    _logger.info("Running in batch-mode, marking queue as durable to ensure retention of the messages.");
+                }
+                else
+                {
+                    createQueue = userInteract("Do you want to make this queue durable?\n"
+                                             + "NOTE: Answering No will result in these messages being discarded!");
+                }
+
+                if (createQueue)
+                {
+                    for (Long messageId : messages)
+                    {
+                        queueMessages.add(messageId);
+                    }
+                    AMQShortString name = new AMQShortString(queueName);
+                    existingQueues.add(name);
+                    QueueRecord record = new QueueRecord(name, null, false, null);
+                    _newMessageStore.createQueue(record);
+                }
+            }
+        }
+
+
         //Migrate _messageMetaDataDb;
         _logger.info("Message MetaData");
         
         final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
-        final TupleBinding oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
-        final TupleBinding newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
+        final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
+        final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
         
         DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
         {
             public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
             {
+                _count++;
                 MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
 
+                // get message id
+                Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
+
+                // ONLY copy data if message is delivered to existing queue
+                if (!queueMessages.contains(messageId))
+                {
+                    return;
+                }
                 DatabaseEntry newValue = new DatabaseEntry();
                 newMetaDataTupleBinding.objectToEntry(metaData, newValue);
                 
                 newMetaDataDB.put(null, key, newValue);
-
-                _count++;
             }
         };
         _oldMessageStore.visitMetaDataDb(metaDataVisitor);
@@ -519,8 +614,8 @@
         _logger.info("Message Contents");
         final Database newContentDB = _newMessageStore.getContentDb();
         
-        final TupleBinding oldContentKeyTupleBinding = new MessageContentKeyTB_4();
-        final TupleBinding newContentKeyTupleBinding = new MessageContentKeyTB_5();
+        final TupleBinding<MessageContentKey> oldContentKeyTupleBinding = new MessageContentKeyTB_4();
+        final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
         final TupleBinding contentTB = new ContentTB();
         
         DatabaseVisitor contentVisitor = new DatabaseVisitor()
@@ -530,10 +625,17 @@
             
             public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
             {
+                _count++;
+
                 //determine the msgId of the current entry
                 MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key);
                 long msgId = contentKey.getMessageId();
 
+                // ONLY copy data if message is delivered to existing queue
+                if (!queueMessages.contains(msgId))
+                {
+                    return;
+                }
                 //if this is a new message, restart the byte offset count.
                 if(_prevMsgId != msgId)
                 {
@@ -556,8 +658,6 @@
 
                 _prevMsgId = msgId;
                 _bytesSeenSoFar += contentSize;
-
-                _count++;
             }
         };
         _oldMessageStore.visitContentDb(contentVisitor);
@@ -567,7 +667,27 @@
 
         //Migrate _deliveryDb;
         _logger.info("Delivery Records");
-        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
+        final Database deliveryDb =_newMessageStore.getDeliveryDb();
+        DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
+        {
+
+            public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+            {
+                _count++;
+
+                // get message id from entry key
+                QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
+                AMQShortString queueName = entryKey.getQueueName();
+
+                // ONLY copy data if message queue exists
+                if (existingQueues.contains(queueName))
+                {
+                    deliveryDb.put(null, key, value);
+                }
+            }
+        };
+        _oldMessageStore.visitDelivery(deliveryDbVisitor);
+        logCount(contentVisitor.getVisitedCount(), "Delivery Record");
     }
 
     /**

Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2011-09-10 22:07:16 UTC (rev 4479)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2011-09-12 16:34:56 UTC (rev 4480)
@@ -25,6 +25,9 @@
 import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -40,13 +43,29 @@
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
 import org.apache.qpid.test.utils.JMXTestUtils;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
 /**
  * Tests upgrading a BDB store and using it with the new broker 
  * after the required contents are entered into the store using 
@@ -264,6 +283,188 @@
         consumeQueueMessages(connection, false);
     }
 
+    /**
+     * Tests store migration containing messages for non-existing queue.
+     *
+     * @throws Exception
+     */
+    public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+    {
+        stopBroker();
+
+        // copy store data into a new location for adding of phantom message
+        File storeLocation = new File(_fromDir);
+        File target = new File(_toDirTwice);
+        if (!target.exists())
+        {
+            target.mkdirs();
+        }
+        FileUtils.copyRecursive(storeLocation, target);
+
+        // delete migrated data
+        File directory = new File(_toDir);
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+
+        // test data
+        String nonExistingQueueName = getTestQueueName();
+        String messageText = "Test Phantom Message";
+
+        // add message
+        addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
+
+        String[] inputs = { "Yes", "Yes", "Yes" };
+        upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+        // start broker
+        startBroker();
+
+        // Create a connection and start it
+        Connection connection = getConnection();
+        connection.start();
+
+        // consume a message for non-existing store
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(nonExistingQueueName);
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        Message message = messageConsumer.receive(1000);
+
+        // assert consumed message
+        assertNotNull("Message was not migrated!", message);
+        assertTrue("Unexpected message received!", message instanceof TextMessage);
+        String text = ((TextMessage) message).getText();
+        assertEquals("Message migration failed!", messageText, text);
+    }
+
+    /**
+     * An utility method to upgrade broker with simulation user interactions
+     *
+     * @param fromDir
+     *            location of the store to migrate
+     * @param toDir
+     *            location of where migrated data will be stored
+     * @param inputs
+     *            user answers on upgrade tool questions
+     * @throws Exception
+     */
+    private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
+            throws Exception
+    {
+        // save to restore system.in after data migration
+        InputStream stdin = System.in;
+
+        // set fake system in to simulate user interactions
+        // FIXME: it is a quite dirty simulator of system input but it does the job
+        System.setIn(new InputStream()
+        {
+
+            int counter = 0;
+
+            public synchronized int read(byte b[], int off, int len)
+            {
+                byte[] src = (inputs[counter] + "\n").getBytes();
+                System.arraycopy(src, 0, b, off, src.length);
+                counter++;
+                return src.length;
+            }
+
+            @Override
+            public int read() throws IOException
+            {
+                return -1;
+            }
+        });
+
+        try
+        {
+            // Upgrade the test store.
+            new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
+        }
+        finally
+        {
+            // restore system in
+            System.setIn(stdin);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
+            String messageText) throws Exception
+    {
+        final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+        BDBMessageStore store = new BDBMessageStore(storeVersion);
+        store.configure(storeLocation, false);
+        try
+        {
+            store.start();
+
+            // store message objects
+            ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+            long bodySize = completeContentBody.limit();
+            MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
+                    false, queueName);
+            BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+            props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+            props.setContentType("text/plain");
+            props.setType("text/plain");
+            props.setMessageId("whatever");
+            props.setEncoding("UTF-8");
+            props.getHeaders().setString("Test", "MST");
+            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+            int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+            ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
+
+            // add content entry to database
+            long messageId = store.getNewMessageId();
+            TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+            MessageContentKey contentKey = null;
+            if (storeVersion == VERSION_4)
+            {
+                contentKey = new MessageContentKey_4(messageId, 0);
+            }
+            else
+            {
+                throw new Exception(storeVersion + " is not supported");
+            }
+            DatabaseEntry key = new DatabaseEntry();
+            contentKeyTB.objectToEntry(contentKey, key);
+            DatabaseEntry data = new DatabaseEntry();
+            ContentTB contentTB = new ContentTB();
+            contentTB.objectToEntry(completeContentBody, data);
+            store.getContentDb().put(null, key, data);
+
+            // add meta data entry to database
+            TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
+            TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+            key = new DatabaseEntry();
+            data = new DatabaseEntry();
+            longTB.objectToEntry(new Long(messageId), key);
+            MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+            metaDataTB.objectToEntry(metaData, data);
+            store.getMetaDataDb().put(null, key, data);
+
+            // add delivery entry to database
+            TransactionLogResource mockQueue = new TransactionLogResource()
+            {
+                public String getResourceName()
+                {
+                    return queueName.asString();
+                }
+            };
+            TransactionLog log = (TransactionLog) store;
+            TransactionLog.Transaction txn = log.newTransaction();
+            txn.enqueueMessage(mockQueue, messageId);
+            txn.commitTran();
+        }
+        finally
+        {
+            // close store
+            store.close();
+        }
+    }
+
     private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
     {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);



More information about the rhmessaging-commits mailing list