[rhmessaging-commits] rhmessaging commits: r3736 - in store/trunk/cpp: tests and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Dec 7 13:44:07 EST 2009
Author: kpvdr
Date: 2009-12-07 13:44:06 -0500 (Mon, 07 Dec 2009)
New Revision: 3736
Modified:
store/trunk/cpp/lib/DataTokenImpl.cpp
store/trunk/cpp/lib/DataTokenImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Fix for BZ 544426 - "Attempting to dequeue a transactionally locked record leaks memory". Also some code tidy-up and a fix to the long test run-jrnl-tests, which allows it to run stand-alone.
Modified: store/trunk/cpp/lib/DataTokenImpl.cpp
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.cpp 2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/DataTokenImpl.cpp 2009-12-07 18:44:06 UTC (rev 3736)
@@ -25,8 +25,6 @@
using namespace mrg::msgstore;
-DataTokenImpl::DataTokenImpl():data_tok()
-{}
+DataTokenImpl::DataTokenImpl():data_tok() {}
-DataTokenImpl::~DataTokenImpl()
-{}
+DataTokenImpl::~DataTokenImpl() {}
Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h 2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/DataTokenImpl.h 2009-12-07 18:44:06 UTC (rev 3736)
@@ -39,10 +39,8 @@
DataTokenImpl();
virtual ~DataTokenImpl();
- inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
- { return sourceMsg; }
- inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
- { sourceMsg = msg; }
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage() { return sourceMsg; }
+ inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg) { sourceMsg = msg; }
};
} // namespace msgstore
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-12-07 18:44:06 UTC (rev 3736)
@@ -1522,7 +1522,6 @@
const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
- ddtokp->addRef();
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
ddtokp->set_rid(messageIdSequence.next());
@@ -1533,6 +1532,8 @@
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
+ // Manually increase the ref count, as raw pointers are used beyond this point
+ ddtokp->addRef();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (tid.empty()) {
@@ -1541,6 +1542,7 @@
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
} catch (const journal::jexception& e) {
+ ddtokp->release();
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
}
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2009-12-07 18:44:06 UTC (rev 3736)
@@ -26,6 +26,7 @@
#include "MessageStoreImpl.h"
#include <iostream>
#include "MessageUtils.h"
+#include "StoreException.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -238,6 +239,42 @@
checkMsg(queueB, 0);
}
+boost::intrusive_ptr<Message> nonTxEnq(Queue::shared_ptr q)
+{
+ boost::intrusive_ptr<Message> msg = createMessage("Message", "exchange", "routingKey");
+ q->deliver(msg);
+ return msg;
+}
+
+QueuedMessage getMsg(Queue::shared_ptr q)
+{
+ boost::intrusive_ptr<Message> msg = q->get().payload;
+ BOOST_REQUIRE(msg);
+ QueuedMessage qm;
+ qm.payload = msg;
+ return qm;
+}
+
+void txDeq(Queue::shared_ptr q, QueuedMessage& qm, TransactionContext* tp)
+{
+ q->dequeue(tp, qm);
+}
+
+void testLock(Queue::shared_ptr q, QueuedMessage qm)
+{
+ try {
+ q->dequeue(0, qm);
+ BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
+ }
+ catch (const mrg::msgstore::StoreException& e) {
+ if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
+ BOOST_ERROR("Unexpected StoreException: " << e.what());
+ }
+ catch (const std::exception& e) {
+ BOOST_ERROR("Unexpected exception: " << e.what());
+ }
+}
+
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
@@ -312,4 +349,20 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(LockedRecordTest)
+{
+ cout << test_filename << ".LockedRecordTest: " << flush;
+
+ setup<MessageStoreImpl>();
+ nonTxEnq(queueA);
+ std::auto_ptr<TransactionContext> txn = store->begin();
+ QueuedMessage qm = getMsg(queueA);
+ txDeq(queueA, qm, txn.get());
+ testLock(queueA, qm);
+ store->commit(*txn);
+ checkMsg(queueA, 0);
+
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2009-12-03 21:01:30 UTC (rev 3735)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2009-12-07 18:44:06 UTC (rev 3736)
@@ -21,13 +21,16 @@
#
# The GNU Lesser General Public License is available in the file COPYING.
+if test x${TMP_DATA_DIR} == x; then
+ export TMP_DATA_DIR=/tmp
+fi
fail=0
num_jrnls=3
# Run jtt using default test set
echo
echo "===== Mode 1: New journal instance, no recover ====="
-jtt/jtt --analyzer ../../tools/store_chk ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
rm -rf ${TMP_DATA_DIR}/test_0*
echo
echo "===== Mode 2: Re-use journal instance, no recover ====="
More information about the rhmessaging-commits
mailing list