[rhmessaging-commits] rhmessaging commits: r3736 - in store/trunk/cpp: tests and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Dec 7 13:44:07 EST 2009


Author: kpvdr
Date: 2009-12-07 13:44:06 -0500 (Mon, 07 Dec 2009)
New Revision: 3736

Modified:
   store/trunk/cpp/lib/DataTokenImpl.cpp
   store/trunk/cpp/lib/DataTokenImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Fix for BZ 544426 - "Attempting to dequeue a transactionally locked record leaks memory". Also some code tidy-up and a fix to the long test run-jrnl-tests, which allows it to run stand-alone.

Modified: store/trunk/cpp/lib/DataTokenImpl.cpp
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.cpp	2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/DataTokenImpl.cpp	2009-12-07 18:44:06 UTC (rev 3736)
@@ -25,8 +25,6 @@
 
 using namespace mrg::msgstore;
 
-DataTokenImpl::DataTokenImpl():data_tok()
-{}
+DataTokenImpl::DataTokenImpl():data_tok() {}
 
-DataTokenImpl::~DataTokenImpl()
-{}
+DataTokenImpl::~DataTokenImpl() {}

Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h	2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/DataTokenImpl.h	2009-12-07 18:44:06 UTC (rev 3736)
@@ -39,10 +39,8 @@
     DataTokenImpl();
     virtual ~DataTokenImpl();
 
-    inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
-    { return sourceMsg; }
-    inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
-    { sourceMsg = msg; }
+    inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage() { return sourceMsg; }
+    inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) { sourceMsg = msg; }
 };
 
 } // namespace msgstore

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-12-07 18:44:06 UTC (rev 3736)
@@ -1522,7 +1522,6 @@
                                     const PersistableQueue& queue)
 {
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
-    ddtokp->addRef();
     ddtokp->setSourceMessage(msg);
     ddtokp->set_external_rid(true);
     ddtokp->set_rid(messageIdSequence.next());
@@ -1533,6 +1532,8 @@
         TxnCtxt* txn = check(ctxt);
         tid = txn->getXid();
     }
+    // Manually increase the ref count, as raw pointers are used beyond this point
+    ddtokp->addRef();
     try {
         JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
         if (tid.empty()) {
@@ -1541,6 +1542,7 @@
             jc->dequeue_txn_data_record(ddtokp.get(), tid);
         }
     } catch (const journal::jexception& e) {
+        ddtokp->release();
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
     }
 }

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2009-12-07 18:44:06 UTC (rev 3736)
@@ -26,6 +26,7 @@
 #include "MessageStoreImpl.h"
 #include <iostream>
 #include "MessageUtils.h"
+#include "StoreException.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/framing/AMQHeaderBody.h"
@@ -238,6 +239,42 @@
     checkMsg(queueB, 0);
 }
 
+boost::intrusive_ptr<Message> nonTxEnq(Queue::shared_ptr q)
+{
+    boost::intrusive_ptr<Message> msg = createMessage("Message", "exchange", "routingKey");
+    q->deliver(msg);
+    return msg;
+}
+
+QueuedMessage getMsg(Queue::shared_ptr q)
+{
+    boost::intrusive_ptr<Message> msg = q->get().payload;
+    BOOST_REQUIRE(msg);
+    QueuedMessage qm;
+    qm.payload = msg;
+    return qm;
+}
+
+void txDeq(Queue::shared_ptr q, QueuedMessage& qm, TransactionContext* tp)
+{
+    q->dequeue(tp, qm);
+}
+
+void testLock(Queue::shared_ptr q, QueuedMessage qm)
+{
+    try {
+        q->dequeue(0, qm);
+        BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
+    }
+    catch (const mrg::msgstore::StoreException& e) {
+        if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
+            BOOST_ERROR("Unexpected StoreException: " << e.what());
+    }
+    catch (const std::exception& e) {
+        BOOST_ERROR("Unexpected exception: " << e.what());
+    }
+}
+
 // === Test suite ===
 
 QPID_AUTO_TEST_CASE(Commit)
@@ -312,4 +349,20 @@
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(LockedRecordTest)
+{
+    cout << test_filename << ".LockedRecordTest: " << flush;
+
+    setup<MessageStoreImpl>();
+    nonTxEnq(queueA);
+    std::auto_ptr<TransactionContext> txn = store->begin();
+    QueuedMessage qm = getMsg(queueA);
+    txDeq(queueA, qm, txn.get());
+    testLock(queueA, qm);
+    store->commit(*txn);
+    checkMsg(queueA, 0);
+
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_SUITE_END()

Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests	2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests	2009-12-07 18:44:06 UTC (rev 3736)
@@ -21,13 +21,16 @@
 #
 # The GNU Lesser General Public License is available in the file COPYING.
 
+if test x${TMP_DATA_DIR} == x; then
+	export TMP_DATA_DIR=/tmp
+fi
 fail=0
 num_jrnls=3
 
 # Run jtt using default test set
 echo
 echo "===== Mode 1: New journal instance, no recover ====="
-jtt/jtt --analyzer ../../tools/store_chk ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk  --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
 rm -rf ${TMP_DATA_DIR}/test_0*
 echo
 echo "===== Mode 2: Re-use journal instance, no recover ====="



More information about the rhmessaging-commits mailing list