rhmessaging commits: r2201 - in store/branches/mrg-1.0/cpp: lib and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-16 15:12:28 -0400 (Wed, 16 Jul 2008)
New Revision: 2201
Modified:
store/branches/mrg-1.0/cpp/Makefile.am
store/branches/mrg-1.0/cpp/configure.ac
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
Log:
Fixed thread sync problem on creation of prepared list journal; also added checks for rpmlint in makefile.am
Modified: store/branches/mrg-1.0/cpp/Makefile.am
===================================================================
--- store/branches/mrg-1.0/cpp/Makefile.am 2008-07-14 18:05:24 UTC (rev 2200)
+++ store/branches/mrg-1.0/cpp/Makefile.am 2008-07-16 19:12:28 UTC (rev 2201)
@@ -27,4 +27,8 @@
rpmbuild: $(SPEC) dist-gzip
mkdir -p $(RPMDIRS)
rpmbuild $(RPMMACROS) $(RPMOPTS) rhm.spec
+if HAS_RPMLINT
rpmlint `find rpm -name '*.rpm'`
+else
+ @echo "WARNING: rpmlint not found, could not validate RPMs."
+endif
Modified: store/branches/mrg-1.0/cpp/configure.ac
===================================================================
--- store/branches/mrg-1.0/cpp/configure.ac 2008-07-14 18:05:24 UTC (rev 2200)
+++ store/branches/mrg-1.0/cpp/configure.ac 2008-07-16 19:12:28 UTC (rev 2201)
@@ -175,6 +175,10 @@
# We use valgrind for the tests. See if it's available.
AC_CHECK_PROG([VALGRIND], [valgrind], [valgrind])
+# If rpmlint is available we'll run it when building RPMs.
+AC_CHECK_PROG([RPMLINT], [rpmlint], [rpmlint])
+AM_CONDITIONAL([HAS_RPMLINT], [test -n "$RPMLINT"])
+
# Also doxygen for documentation...
AC_CHECK_PROG([do_doxygen], [doxygen], [yes])
AM_CONDITIONAL([DOXYGEN], [test x$do_doxygen = xyes])
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 18:05:24 UTC (rev 2200)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:12:28 UTC (rev 2201)
@@ -171,6 +171,7 @@
void BdbMessageStore::chkInitPreparedXidStore()
{
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!preparedXidStorePtr->is_ready()) {
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
16 years, 5 months
rhmessaging commits: r2200 - store/branches/mrg-1.0/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 14:05:24 -0400 (Mon, 14 Jul 2008)
New Revision: 2200
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
Log:
Whitespace changes only - misc tidyups and respacing which makes BdbMessageStore.cpp/.h and TxnCtxt.h similar to trunk.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 18:05:24 UTC (rev 2200)
@@ -110,20 +110,20 @@
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wCachePageSize)
{
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
+ break;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
+ break;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
if (dir.size()>0) storeDir = dir;
@@ -136,8 +136,8 @@
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
- "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
- "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
+ "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
+ "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
THROW_STORE_EXCEPTION_2("Error opening environment", e);
}
@@ -204,37 +204,37 @@
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0) {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- } else {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
+ if (oldJrnlWrCachePageSize == 0) {
+ // For zero value, use default
+ jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
+ } else {
+ // For any positive value, use closest value
+ if (oldJrnlWrCachePageSize < 6)
+ jrnlWrCachePageSize = 4;
+ else if (oldJrnlWrCachePageSize < 12)
+ jrnlWrCachePageSize = 8;
+ else if (oldJrnlWrCachePageSize < 24)
+ jrnlWrCachePageSize = 16;
+ else if (oldJrnlWrCachePageSize < 48)
+ jrnlWrCachePageSize = 32;
+ else if (oldJrnlWrCachePageSize < 96)
+ jrnlWrCachePageSize = 64;
+ else if (oldJrnlWrCachePageSize > 128)
+ jrnlWrCachePageSize = 128;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
+ }
}
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
@@ -725,15 +725,15 @@
break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
- ::usleep(AIO_SLEEP_TIME);
- break;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+ read = false;
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
- assert( "Store Error: Unexpected msg state");
+ assert("Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
@@ -1041,7 +1041,8 @@
}
}
-void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::enqueue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1092,9 +1093,9 @@
if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
+ dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
@@ -1123,7 +1124,8 @@
}
}
-void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::dequeue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 18:05:24 UTC (rev 2200)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _BdbMessageStore_
@@ -46,286 +46,286 @@
#endif
namespace rhm {
- namespace bdbstore {
+namespace bdbstore {
- /**
- * An implementation of the MessageStore interface based on Berkeley DB
- */
- class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
- {
- protected:
- typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+/**
+ * An implementation of the MessageStore interface based on Berkeley DB
+ */
+class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
+{
+ protected:
+ typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
- typedef LockedMappings::map txn_lock_map;
- typedef boost::ptr_list<PreparedTransaction> txn_list;
+ typedef LockedMappings::map txn_lock_map;
+ typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
- u_int64_t rid;
- bool deq_flag;
- bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
- };
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ // Structs for preparedXidStore recover state
+ struct PreparedRecoverStruct {
+ u_int64_t rid;
+ bool deq_flag;
+ bool commit_flag;
+ PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ };
+ typedef PreparedRecoverStruct PreparedRecover;
+ typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+ typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+ typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
- // Default store settings
- static const u_int16_t defNumJrnlFiles = 8;
- static const u_int32_t defJrnlFileSizePgs = 24;
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- static const u_int16_t defXidStoreNumJrnlFiles = 8;
- static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
- static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ // Default store settings
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defXidStoreNumJrnlFiles = 8;
+ static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+ static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- std::list<Db*> dbs;
- DbEnv env;
- Db queueDb;
- Db configDb;
- Db exchangeDb;
- Db messageDb;
- Db mappingDb;
- Db bindingDb;
- Db generalDb;
+ std::list<Db*> dbs;
+ DbEnv env;
+ Db queueDb;
+ Db configDb;
+ Db exchangeDb;
+ Db messageDb;
+ Db mappingDb;
+ Db bindingDb;
+ Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to prepared XID journal instance
+ boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+ PreparedRecoverMap preparedXidStoreRecoverMap;
- IdSequence queueIdSequence;
- IdSequence exchangeIdSequence;
- IdSequence generalIdSequence;
- IdSequence messageIdSequence;
- std::string storeDir;
- u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
- u_int64_t highestRid;
- bool isInit;
- const char* envPath;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
- qpid::management::Store::shared_ptr mgmtObject;
- qpid::sys::Mutex jrnlCreateLock;
+ IdSequence queueIdSequence;
+ IdSequence exchangeIdSequence;
+ IdSequence generalIdSequence;
+ IdSequence messageIdSequence;
+ std::string storeDir;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wcache_pgsize_sblks;
+ u_int16_t wcache_num_pages;
+ u_int64_t highestRid;
+ bool isInit;
+ const char* envPath;
+ static qpid::sys::Duration defJournalGetEventsTimeout;
+ static qpid::sys::Duration defJournalFlushTimeout;
+ qpid::management::Store::shared_ptr mgmtObject;
+ qpid::sys::Mutex jrnlCreateLock;
- void recoverQueues(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- queue_index& index,
- txn_list& locked,
- message_index& messages);
- void recoverMessages(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- queue_index& index,
- txn_list& locked,
- message_index& prepared);
- void recoverMessages(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked,
- message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- exchange_index& index);
- void recoverBindings(TxnCtxt& txn,
- exchange_index& exchanges,
- queue_index& queues);
- void recoverGeneral(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn,
- IdDbt& msgId,
- qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index,
- txn_list& locked,
- message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
- TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue,
- TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor,
- DbTxn* txn,
- Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn,
- Dbt& messageId);
- bool isUnused(Cursor& cursor,
- Dbt& messageId);
- void destroy(Db& db,
- const qpid::broker::Persistable& p);
- bool create(Db& db,
- IdSequence& seq,
- const qpid::broker::Persistable& p);
- void completed(TxnCtxt& txn,
- bool commit);
- void record2pcOp(Db& db,
- TPCTxnCtxt& txn,
- u_int64_t messageId,
- u_int64_t queueId);
- void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key);
+ void recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& messages);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked,
+ message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId,
+ unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ exchange_index& index);
+ void recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverPreparedXidJournal();
+ void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
+ void recoverXids(txn_list& txns);
+ void readLockedMappings(Db& db,
+ txn_lock_map& mappings);
+ TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+ void store(const qpid::broker::PersistableQueue* queue,
+ TxnCtxt* txn,
+ Dbt& messageId,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn,
+ Dbt& messageId);
+ bool isUnused(Cursor& cursor,
+ Dbt& messageId);
+ void destroy(Db& db,
+ const qpid::broker::Persistable& p);
+ bool create(Db& db,
+ IdSequence& seq,
+ const qpid::broker::Persistable& p);
+ void completed(TxnCtxt& txn,
+ bool commit);
+ void record2pcOp(Db& db,
+ TPCTxnCtxt& txn,
+ u_int64_t messageId,
+ u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key);
- u_int64_t getRecordSize(Db& db,
- Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn,
- Db& db,
- Dbt& key);
- void put(Db& db,
- DbTxn* txn,
- Dbt& key,
- Dbt& value);
- bool deleteKeyValuePair(Db& db,
- DbTxn* txn,
- Dbt& key,
- Dbt& value);
- void open(Db& db,
- DbTxn* txn,
- const char* file,
- bool dupKey);
+ u_int64_t getRecordSize(Db& db,
+ Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key);
+ void put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ bool deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ void open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey);
- // journal functions
- void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- std::string getJrnlDir(const char* queueName);
- std::string getJrnlBaseDir();
- std::string getBdbBaseDir();
- std::string getPxidBaseDir();
- inline void checkInit() {
- if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
- }
- void chkInitPreparedXidStore();
-
- // debug aid for printing XIDs that may contain non-printable chars
- static std::string xid2str(const std::string xid) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- for (unsigned i=0; i<xid.size(); i++) {
- if (isprint(xid[i]))
- oss << xid[i];
- else
- oss << "/" << std::setw(2) << (int)((char)xid[i]);
- }
- return oss.str();
- }
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ std::string getJrnlDir(const char* queueName);
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
+ inline void checkInit() {
+ if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ }
+ void chkInitPreparedXidStore();
- public:
- struct Options : public qpid::Options {
- Options(const std::string& name="Store Options");
- std::string clusterName;
- std::string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
- };
+ // debug aid for printing XIDs that may contain non-printable chars
+ static std::string xid2str(const std::string xid) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (unsigned i=0; i<xid.size(); i++) {
+ if (isprint(xid[i]))
+ oss << xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)xid[i]);
+ }
+ return oss.str();
+ }
- typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
+ public:
+ struct Options : public qpid::Options {
+ Options(const std::string& name="Store Options");
+ std::string clusterName;
+ std::string storeDir;
+ bool storeAsync;
+ bool storeForce;
+ uint16_t numJrnlFiles;
+ uint32_t jrnlFsizePgs;
+ uint32_t wCachePageSize;
+ };
- BdbMessageStore(const char* envpath = 0);
+ typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
- virtual ~BdbMessageStore();
+ BdbMessageStore(const char* envpath = 0);
- bool init(const qpid::Options* options);
+ virtual ~BdbMessageStore();
- bool init(const std::string& dir,
- u_int16_t jfiles,
- u_int32_t jfileSizePgs,
- uint32_t wCachePageSize);
+ bool init(const qpid::Options* options);
- void initManagement (qpid::broker::Broker* broker);
+ bool init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize);
- void truncate();
+ void initManagement (qpid::broker::Broker* broker);
- void create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& args);
+ void truncate();
- void destroy(qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableExchange& queue,
- const qpid::framing::FieldTable& args);
+ void destroy(qpid::broker::PersistableQueue& queue);
- void destroy(const qpid::broker::PersistableExchange& queue);
+ void create(const qpid::broker::PersistableExchange& queue,
+ const qpid::framing::FieldTable& args);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key,
- const qpid::framing::FieldTable& args);
+ void destroy(const qpid::broker::PersistableExchange& queue);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key,
- const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableConfig& config);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
- void destroy(const qpid::broker::PersistableConfig& config);
+ void create(const qpid::broker::PersistableConfig& config);
- void recover(qpid::broker::RecoveryManager& queues);
+ void destroy(const qpid::broker::PersistableConfig& config);
- void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void recover(qpid::broker::RecoveryManager& queues);
- void destroy(qpid::broker::PersistableMessage& msg);
+ void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- const std::string& data);
+ void destroy(qpid::broker::PersistableMessage& msg);
- void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data,
- u_int64_t offset,
- u_int32_t length);
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
- void flush(const qpid::broker::PersistableQueue& queue);
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
- u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ void flush(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<std::string>& xids);
+ u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- std::auto_ptr<qpid::broker::TransactionContext> begin();
+ void collectPreparedXids(std::set<std::string>& xids);
- std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ std::auto_ptr<qpid::broker::TransactionContext> begin();
- void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
- void localPrepare(TxnCtxt* ctxt);
+ void prepare(qpid::broker::TPCTransactionContext& ctxt);
- void commit(qpid::broker::TransactionContext& ctxt);
+ void localPrepare(TxnCtxt* ctxt);
- void abort(qpid::broker::TransactionContext& ctxt);
+ void commit(qpid::broker::TransactionContext& ctxt);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
- { return mgmtObject; }
+ void abort(qpid::broker::TransactionContext& ctxt);
- inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
- { return qpid::management::Manageable::STATUS_OK; }
- }; // class BdbMessageStore
-
- } // namespace bdbstore
+ qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
+
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ { return qpid::management::Manageable::STATUS_OK; }
+}; // class BdbMessageStore
+
+} // namespace bdbstore
} // namespace rhm
-
#endif
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 18:05:24 UTC (rev 2200)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _TxnCtxt_
@@ -170,7 +170,7 @@
DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
-
+
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
16 years, 5 months
rhmessaging commits: r2199 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 13:32:13 -0400 (Mon, 14 Jul 2008)
New Revision: 2199
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
Log:
Backport of trunk r.2189: Fixed 1PC multi-queue atomic commits/aborts. Also fixed unnecessary prepared list writes that occur in local transactions that contain no durable messages.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:32:13 UTC (rev 2199)
@@ -1218,7 +1218,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn,
+void BdbMessageStore::completed(TxnCtxt& txn,
bool commit)
{
try {
@@ -1256,21 +1256,25 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
- TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
+ localPrepare(txn);
+}
+void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
+{
try {
chkInitPreparedXidStore();
- txn->incrDtokRef();
- DataTokenImpl* dtokp = txn->getDtok();
+ ctxt->incrDtokRef();
+ DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->prepare(preparedXidStorePtr.get());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
- txn->sync();
+ ctxt->sync();
} catch (const std::exception& e) {
- QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
+ QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
}
}
@@ -1279,22 +1283,22 @@
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
- } else {
- txn->complete(true);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
- } else {
- txn->complete(false);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
TxnCtxt* BdbMessageStore::check(TransactionContext* ctxt)
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:32:13 UTC (rev 2199)
@@ -169,7 +169,7 @@
bool create(Db& db,
IdSequence& seq,
const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn,
+ void completed(TxnCtxt& txn,
bool commit);
void record2pcOp(Db& db,
TPCTxnCtxt& txn,
@@ -306,9 +306,15 @@
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
+
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+
void prepare(qpid::broker::TPCTransactionContext& ctxt);
+
+ void localPrepare(TxnCtxt* ctxt);
+
void commit(qpid::broker::TransactionContext& ctxt);
+
void abort(qpid::broker::TransactionContext& ctxt);
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:32:13 UTC (rev 2199)
@@ -59,6 +59,7 @@
IdSequence* loggedtx;
boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
+ JournalImpl* preparedXidStorePtr;
/**
* local txn id, if non XA.
@@ -72,6 +73,8 @@
commitTxn(static_cast<JournalImpl*>(*i), commit);
}
impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
}
void commitTxn(JournalImpl* jc, bool commit) {
@@ -95,7 +98,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -125,6 +128,8 @@
for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
firstloop = false;
}
}
@@ -167,7 +172,9 @@
virtual const std::string& getXid() { return tid; }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
+ bool impactedQueuesEmpty() { return impactedQueues.empty(); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -182,25 +189,9 @@
{
protected:
const std::string xid;
- JournalImpl* preparedXidStorePtr;
- virtual void completeTxn(bool commit) {
- TxnCtxt::completeTxn(commit);
- if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
- }
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
- void sync() {
- TxnCtxt::sync();
- bool allWritten = false;
- if (preparedXidStorePtr) {
- while (!allWritten) {
- allWritten = true;
- sync_jrnl(preparedXidStorePtr, true, allWritten);
- }
- }
- }
- inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:32:13 UTC (rev 2199)
@@ -33,6 +33,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -55,6 +56,7 @@
// to end to simulate multi-queue txn complete failure.
while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
}
+ void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
};
// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
@@ -68,6 +70,25 @@
// pass sequence number for c/a
return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
}
+ void commit(TransactionContext& ctxt, const bool complete_prepared_list) {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
+ }
+ void abort(TransactionContext& ctxt, const bool complete_prepared_list)
+ {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
+ }
};
// === Helper fns ===
@@ -178,10 +199,11 @@
checkMsg(y, 0);
}
-void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
{
setup<TestMessageStore>();
- std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
+ TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+ std::auto_ptr<TransactionContext> txn(tmsp->begin());
//create two messages and enqueue them onto both queues:
boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
@@ -193,9 +215,9 @@
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
- store->commit(*txn);
+ tmsp->commit(*txn, complete_prepared_list);
else
- store->abort(*txn);
+ tmsp->abort(*txn, complete_prepared_list);
restart<TestMessageStore>();
// Check outcome
@@ -225,47 +247,61 @@
swap(false);
cout << "ok" << endl;
}
-/*
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
{
cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
- testMultiQueueTxn(0, true);
+ testMultiQueueTxn(0, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
{
cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
- testMultiQueueTxn(0, false);
+ testMultiQueueTxn(0, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
{
cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
- testMultiQueueTxn(1, true);
+ testMultiQueueTxn(1, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
{
cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
- testMultiQueueTxn(1, false);
+ testMultiQueueTxn(1, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
{
cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
- testMultiQueueTxn(2, true);
+ testMultiQueueTxn(2, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
{
cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
- testMultiQueueTxn(2, false);
+ testMultiQueueTxn(2, false, false);
cout << "ok" << endl;
}
-*/
+
QPID_AUTO_TEST_SUITE_END()
16 years, 5 months
rhmessaging commits: r2198 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 13:05:31 -0400 (Mon, 14 Jul 2008)
New Revision: 2198
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/JournalImpl.h
store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Backport of trunk r.2187-2188: Fixed 2PC multi-queue transaction atomicity problem. Any multi-queue 2PC commit/abort which is interrupted by failure will now be completed on recover for all queues which did not get processed. Local txns still have this problem, however. Some code tidy-up is also included. Removed old error messages from StorePlugin which are no longer possible owing to the removal of sync journal. 2188: orrected signed/unsigned comparison (which does not show up on F9)
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -26,6 +26,7 @@
#include "BindingDbt.h"
#include "BufferValue.h"
#include "IdPairDbt.h"
+#include "jrnl/txn_map.hpp"
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
@@ -49,28 +50,35 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
- queueDb(&env, 0),
- configDb(&env, 0),
- exchangeDb(&env, 0),
- messageDb(&env, 0),
- mappingDb(&env, 0),
- bindingDb(&env, 0),
- generalDb(&env, 0),
- numJrnlFiles(defNumJrnlFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
- wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
- wcache_num_pages(JRNL_WMGR_DEF_PAGES),
- highestRid(0),
- isInit(false),
- envPath(envpath)
+BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+ const bool _deq_flag,
+ const bool _commit_flag) :
+ rid(_rid),
+ deq_flag(_deq_flag),
+ commit_flag(_commit_flag)
+{}
+BdbMessageStore::BdbMessageStore(const char* envpath) :
+ env(0),
+ queueDb(&env, 0),
+ configDb(&env, 0),
+ exchangeDb(&env, 0),
+ messageDb(&env, 0),
+ mappingDb(&env, 0),
+ bindingDb(&env, 0),
+ generalDb(&env, 0),
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
+ wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
+ isInit(false),
+ envPath(envpath)
{}
-
+
void BdbMessageStore::initManagement (Broker* broker)
{
- if (broker != 0)
- {
+ if (broker != 0) {
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
@@ -87,13 +95,16 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
-{
+bool BdbMessageStore::init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize)
+{
if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
-
+
// set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
@@ -114,7 +125,7 @@
// 1 MiB total cache
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
-
+
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
@@ -132,12 +143,11 @@
TxnCtxt txn;
try {
-
txn.begin(env, false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
- open(messageDb, txn.get(), "messages.db", false);
+ open(messageDb, txn.get(), "messages.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
@@ -153,7 +163,7 @@
txn.abort();
throw;
}
-
+
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -161,25 +171,21 @@
void BdbMessageStore::chkInitPreparedXidStore()
{
- if (!preparedXidStorePtr->is_init())
- {
+ if (!preparedXidStorePtr->is_ready()) {
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
}
}
-bool BdbMessageStore::init(const qpid::Options* options)
+bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
-
+
u_int16_t numJrnlFiles = opts->numJrnlFiles;
- if (numJrnlFiles < JRNL_MIN_NUM_FILES)
- {
+ if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
numJrnlFiles = JRNL_MIN_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
- }
- else if (numJrnlFiles > JRNL_MAX_NUM_FILES)
- {
+ } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
numJrnlFiles = JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
}
@@ -187,17 +193,14 @@
u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (jrnlFsizePgs < jrnlMinFsizePgs)
- {
+ if (jrnlFsizePgs < jrnlMinFsizePgs) {
jrnlFsizePgs = jrnlMinFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
- }
- else if (jrnlFsizePgs > jrnlMaxFsizePgs)
- {
+ } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
jrnlFsizePgs = jrnlMaxFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
}
-
+
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
@@ -212,14 +215,11 @@
break;
default:
u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0)
- {
+ if (oldJrnlWrCachePageSize == 0) {
// For zero value, use default
jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- }
- else
- {
+ } else {
// For any positive value, use closest value
if (oldJrnlWrCachePageSize < 6)
jrnlWrCachePageSize = 4;
@@ -240,17 +240,19 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
+void BdbMessageStore::open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
-BdbMessageStore::~BdbMessageStore()
+BdbMessageStore::~BdbMessageStore()
{
try {
-
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
@@ -279,17 +281,18 @@
(*i)->truncate(txn, &count, 0);
}
- txn->commit(0);
- try{
+ txn->commit(0);
+ try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
journal::jdir::delete_dir(getPxidBaseDir(),true);
- }
+ }
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
- }
+ }
}
-void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
+void BdbMessageStore::create(PersistableQueue& queue,
+ const FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
@@ -315,15 +318,15 @@
string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
+
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
+ try {
// init will create the deque's for the init...
jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": create() failed: " + e.what());
}
-
try {
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
@@ -339,21 +342,20 @@
destroy(queueDb, queue);
deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
- if (eqs)
- {
+ if (eqs) {
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
- jQueue->delete_jrnl_files();
+ jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
}
}
-void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
+void BdbMessageStore::create(const PersistableExchange& exchange,
+ const FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
-
try {
if (!create(exchangeDb, exchangeIdSequence, exchange)) {
THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
@@ -378,7 +380,6 @@
if (general.getPersistenceId()) {
THROW_STORE_EXCEPTION("General configuration item already created");
}
-
try {
if (!create(generalDb, generalIdSequence, general)) {
THROW_STORE_EXCEPTION("General configuration already exists");
@@ -394,7 +395,9 @@
destroy(generalDb, general);
}
-bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
+bool BdbMessageStore::create(Db& db,
+ IdSequence& seq,
+ const Persistable& p)
{
u_int64_t id (seq.next());
Dbt key(&id, sizeof(id));
@@ -416,11 +419,13 @@
}
-void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+void BdbMessageStore::bind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable& a)
{
checkInit();
- IdDbt key(e.getPersistenceId());
+ IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(env, true);
@@ -428,8 +433,10 @@
txn.commit();
}
-void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable&)
+void BdbMessageStore::unbind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable&)
{
checkInit();
deleteBinding(e, q, k);
@@ -450,7 +457,7 @@
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
-
+
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
@@ -466,35 +473,45 @@
}
//recover transactions:
- for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->prepare(preparedXidStorePtr.get());
+
// Restore data token state in TxnCtxt
- xid_rid_map_citr citr = preparedMap.find(i->xid);
- if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
- tpcc->recoverDtok(citr->second, i->xid);
- tpcc->addXidRecord(preparedXidStorePtr.get());
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
+ if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ tpcc->recoverDtok(citr->second.rid, i->xid);
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+ // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // was interrupted part way through committing/aborting the impacted queues. Complete this process.
+ bool incomplTxnFlag = citr->second.deq_flag;
+
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->enqueue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->dequeue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
+
+ if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
}
registry.recoveryComplete();
}
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
- prepared, message_index& messages)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn,
+ RecoveryManager& registry,
+ queue_index& queue_index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -510,7 +527,7 @@
RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
-
+
const char* queueName = queue->getName().c_str();
JournalImpl* jQueue = 0;
{
@@ -518,14 +535,14 @@
jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-
+
try
{
u_int64_t thisHighestRid = 0;
jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
+ recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -544,7 +561,9 @@
}
-void BdbMessageStore::recoverExchanges(TxnCtxt& txn, RecoveryManager& registry, exchange_index& index)
+void BdbMessageStore::recoverExchanges(TxnCtxt& txn,
+ RecoveryManager& registry,
+ exchange_index& index)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
Cursor exchanges;
@@ -566,13 +585,15 @@
exchangeIdSequence.reset(maxExchangeId + 1);
}
-void BdbMessageStore::recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues)
+void BdbMessageStore::recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues)
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -582,7 +603,7 @@
uint64_t queueId = buffer.getLongLong();
string queueName;
string routingkey;
- FieldTable args;
+ FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
@@ -599,7 +620,8 @@
}
}
-void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn,
+ RecoveryManager& registry)
{
Cursor items;
items.open(generalDb, txn.get());
@@ -620,16 +642,18 @@
}
-void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& prepared,
+ message_index& messages)
{
+ size_t preambleLength = sizeof(u_int32_t)/*header size*/;
- size_t preambleLength = sizeof(u_int32_t)/*header size*/;
-
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
size_t readSize = 0;
- unsigned msg_count=0;
+ unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
@@ -642,58 +666,63 @@
bool transientFlag = false;
bool externalFlag = false;
+ dtokp.set_wstate(DataTokenImpl::ENQ);
- dtokp.set_wstate(DataTokenImpl::ENQ);
- // read the message from the Journal.
+ // Read the message from the Journal.
try {
-
- //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
-
unsigned aio_sleep_cnt = 0;
while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
readSize = dtokp.dsize();
-
+
switch (res)
{
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- RecoverableMessage::shared_ptr msg;
- char* data = (char*)dbuff;
-
- unsigned headerSize;
- if (externalFlag){
- msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
- } else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- msg = recovery.recoverMessage(headerBuff);
- }
- msg->setPersistenceId(dtokp.rid());
-
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize) && !externalFlag) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ msg_count++;
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
-
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
-
- if (xidbuff)
- ::free(xidbuff);
- else if (dbuff)
- ::free(dbuff);
- aio_sleep_cnt = 0;
- break;
+ unsigned headerSize;
+ if (externalFlag) {
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
+ msg->setPersistenceId(dtokp.rid());
+
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize) && !externalFlag) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+
+ if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
+ std::string xid((char*)xidbuff, xidbuffSize);
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ if (citr->second.commit_flag)
+ queue->recover(msg);
+ } else {
+ messages[dtokp.rid()] = msg;
+ }
+ } else {
+ queue->recover(msg);
+ }
+
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+
+ if (xidbuff)
+ ::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
+ aio_sleep_cnt = 0;
+ break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
@@ -702,7 +731,7 @@
break;
case rhm::journal::RHM_IORES_EMPTY:
read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
assert( "Store Error: Unexpected msg state");
} // switch
@@ -713,33 +742,37 @@
}
}
-RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t messageId, unsigned& headerSize)
+RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t messageId,
+ unsigned& headerSize)
{
Dbt key (&messageId, sizeof(messageId));
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+ size_t preamble_length = sizeof(u_int32_t); /*header size*/
BufferValue value(preamble_length, 0);
value.buffer.record();
if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
//read header only to begin with
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
return recovery.recoverMessage(header.buffer);
-}
+}
-int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked,
- message_index& prepared)
+int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor mappings;
mappings.open(mappingDb, txn.get());
@@ -751,10 +784,10 @@
if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
- } else {
+ } else {
RecoverableQueue::shared_ptr queue = index[value.id];
- if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
- prepared[msgId.id] = msg;
+ if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
+ messages[msgId.id] = msg;
} else {
queue->recover(msg);
}
@@ -765,22 +798,69 @@
return count;
}
+void BdbMessageStore::recoverPreparedXidJournal()
+{
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ u_int64_t thisHighestRid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
+}
+
+void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+{
+ if (preparedXidStorePtr.get()) {
+ if (!preparedXidStorePtr->is_ready())
+ recoverPreparedXidJournal();
+
+ // TODO: The journal will return a const txn_map and the txn_map will support
+ // const operations at some point. Using non-const txn_map this way is ugly...
+ journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ std::vector<std::string> xidList;
+ tmap.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
+ journal::txn_data_list txnList = tmap.get_tdata_list(*i);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+ bool commitFlag = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ }
+ }
+}
+
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> preparedXidSet;
- collectPreparedXids(preparedXidSet);
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
}
-void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
+void BdbMessageStore::readLockedMappings(Db& db,
+ txn_lock_map& mappings)
{
Cursor c;
c.open(db, 0);
@@ -795,54 +875,10 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
- {
- u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
- JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, thisHighestRid, 0);
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- try {
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- DataTokenImpl dtokp;
- bool done = false;
- long aio_sleep_cnt = 0;
- while (!done) {
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
- rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
- switch (res) {
- case rhm::journal::RHM_IORES_SUCCESS:
- if (xidbuffSize > 0) {
- xids.insert(std::string((const char*)xidbuff, xidbuffSize));
- preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
- ::free(xidbuff);
- } else {
- THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
- }
- aio_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- done = true;
- break;
- default:
- assert( "Store Error: Unexpected msg state");
- }
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
- }
-
- preparedXidStorePtr->recover_complete(); // start journal.
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ xids.insert(i->first);
}
}
@@ -864,8 +900,9 @@
txn.abort();
throw;
}
- }
+ }
}
+
void BdbMessageStore::destroy(PersistableMessage& msg)
{
checkInit();
@@ -880,17 +917,19 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
- }
+ }
}
}
-
-u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(Db& db,
+ Dbt& key)
{
return getRecordSize(0, db, key);
}
-u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key)
{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
@@ -906,7 +945,8 @@
return peek.get_size();
}
-void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
+void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
{
checkInit();
u_int64_t messageId (msg->getPersistenceId());
@@ -932,28 +972,31 @@
}
} else {
THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
- }
+ }
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length)
{
checkInit();
- u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
+ u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ + msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
-
+
if (messageId != 0) {
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc && jc->is_enqueued(messageId) ){
- if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ if (jc && jc->is_enqueued(messageId) ) {
+ if (jc->loadMsgContent(messageId, data, realOffset, length)) {
return;
}
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": loadContent() failed: " + e.what());
- }
+ }
TxnCtxt txn;
txn.begin(env, true);
try {
@@ -964,11 +1007,11 @@
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb.get(txn.get(), &key, &value, 0);
+ int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
txn.abort();
delete [] buffer;
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
txn.commit();
data.assign(buffer, value.get_size());
@@ -989,11 +1032,11 @@
std::string qn = queue.getName();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc){
+ if (jc) {
// TODO: check if this result should be used...
/*rhm::journal::iores res =*/ jc->flush();
}
- }catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
@@ -1029,14 +1072,14 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
-void BdbMessageStore::store(const PersistableQueue* queue,
- TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
+void BdbMessageStore::store(const PersistableQueue* queue,
+ TxnCtxt* txn, Dbt& messageId,
+ intrusive_ptr<PersistableMessage>& message,
bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
- char* buff= 0;
+ char* buff = 0;
if (!message->isContentReleased() )
{
buff = static_cast<char*>(::alloca(size)); // long + headers + content
@@ -1046,23 +1089,22 @@
}
try {
-
- if ( queue ) {
+ if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
+
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
+ if (txn->getXid().empty()) {
+ if (message->isContentReleased()) {
jc->enqueue_extern_data_record(size, dtokp.get(), false);
} else {
jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
- }else {
- if (message->isContentReleased()){
+ } else {
+ if (message->isContentReleased()) {
jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
} else {
jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
@@ -1083,7 +1125,7 @@
void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
-{
+{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
@@ -1101,28 +1143,27 @@
} else {
txn = &implicit;
}
-
+
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
-
+ async_dequeue(ctxt, msg, queue);
+
msg->dequeueComplete();
}
-void BdbMessageStore::async_dequeue(
- TransactionContext* ctxt,
- intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
- ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
string tid;
- if (ctxt){
+ if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
@@ -1133,7 +1174,7 @@
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
}
@@ -1144,15 +1185,17 @@
return 0;
}
-bool BdbMessageStore::deleteIfUnused(DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(DbTxn* txn,
+ Dbt& messageId)
{
Cursor cursor;
cursor.open(mappingDb, txn);
-
return deleteIfUnused(cursor, txn, messageId);
}
-bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId)
{
if (isUnused(cursor, messageId)) {
messageDb.del(txn, &messageId, 0);
@@ -1175,7 +1218,8 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn,
+ bool commit)
{
try {
// Nothing to do if not prepared
@@ -1185,7 +1229,7 @@
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
} catch (const std::exception& e) {
@@ -1194,7 +1238,7 @@
}
}
-auto_ptr<TransactionContext> BdbMessageStore::begin()
+auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
// pass sequence number for c/a
@@ -1214,7 +1258,7 @@
checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
-
+
try {
chkInitPreparedXidStore();
txn->incrDtokRef();
@@ -1222,8 +1266,7 @@
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->addXidRecord(preparedXidStorePtr.get());
-
+ txn->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
} catch (const std::exception& e) {
@@ -1232,18 +1275,18 @@
}
}
-void BdbMessageStore::commit(TransactionContext& ctxt)
+void BdbMessageStore::commit(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->complete(true);
}
}
-void BdbMessageStore::abort(TransactionContext& ctxt)
+void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
@@ -1261,7 +1304,10 @@
return txn;
}
-void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+void BdbMessageStore::put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
try {
int status = db.put(txn, &key, &value, DB_NODUPDATA);
@@ -1275,8 +1321,10 @@
}
}
-
-bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+bool BdbMessageStore::deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
Cursor cursor;
cursor.open(db, txn);
@@ -1298,9 +1346,9 @@
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -1319,16 +1367,18 @@
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
-void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& bkey)
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& bkey)
{
TxnCtxt txn;
txn.begin(env, true);
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key(exchange.getPersistenceId());
- Dbt value;
+ Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
@@ -1353,21 +1403,21 @@
txn.commit();
}
-string BdbMessageStore::getJrnlBaseDir()
+string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getPxidBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/pxid/" ;
@@ -1386,19 +1436,19 @@
dir << std::setw(4);
dir << std::setfill('0');
u_int32_t count = 0;
- for (u_int32_t i=0; i < strlen(queueName); i++)
- count += queueName[i];
-
- dir << (count%20);
+ for (u_int32_t i = 0; i < strlen(queueName); i++) {
+ count += queueName[i];
+ }
+ dir << (count % 20);
dir << "/" << queueName << "/";
return dir.str();
}
BdbMessageStore::Options::Options(const std::string& name) :
- qpid::Options(name),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
- wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+ qpid::Options(name),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
+ wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -53,15 +53,25 @@
*/
class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
{
+ protected:
typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
-
+
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- typedef std::map<std::string, u_int64_t> xid_rid_map;
- typedef xid_rid_map::const_iterator xid_rid_map_citr;
+ // Structs for preparedXidStore recover state
+ struct PreparedRecoverStruct {
+ u_int64_t rid;
+ bool deq_flag;
+ bool commit_flag;
+ PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ };
+ typedef PreparedRecoverStruct PreparedRecover;
+ typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+ typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+ typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -80,7 +90,11 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
+
+ // Pointer to prepared XID journal instance
boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+ PreparedRecoverMap preparedXidStoreRecoverMap;
+
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -90,7 +104,6 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
- xid_rid_map preparedMap;
u_int64_t highestRid;
bool isInit;
const char* envPath;
@@ -99,57 +112,99 @@
qpid::management::Store::shared_ptr mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
- bool mode(const bool mode, const bool force);
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& messages);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
- void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
- void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked, message_index& prepared);
+ void recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& messages);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked,
+ message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ exchange_index& index);
+ void recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverPreparedXidJournal();
+ void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db, txn_lock_map& mappings);
+ void readLockedMappings(Db& db,
+ txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
- bool isUnused(Cursor& cursor, Dbt& messageId);
- void destroy(Db& db, const qpid::broker::Persistable& p);
- bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, bool commit);
- void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ void store(const qpid::broker::PersistableQueue* queue,
+ TxnCtxt* txn,
+ Dbt& messageId,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn,
+ Dbt& messageId);
+ bool isUnused(Cursor& cursor,
+ Dbt& messageId);
+ void destroy(Db& db,
+ const qpid::broker::Persistable& p);
+ bool create(Db& db,
+ IdSequence& seq,
+ const qpid::broker::Persistable& p);
+ void completed(TPCTxnCtxt& txn,
+ bool commit);
+ void record2pcOp(Db& db,
+ TPCTxnCtxt& txn,
+ u_int64_t messageId,
+ u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
const std::string& key);
- u_int64_t getRecordSize(Db& db, Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
- void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
-
-
+ u_int64_t getRecordSize(Db& db,
+ Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key);
+ void put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ bool deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ void open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey);
+
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
std::string getJrnlDir(const char* queueName);
- std::string getJrnlBaseDir();
- std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
@@ -183,51 +238,71 @@
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
BdbMessageStore(const char* envpath = 0);
+
virtual ~BdbMessageStore();
+
bool init(const qpid::Options* options);
- bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+
+ bool init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize);
+
void initManagement (qpid::broker::Broker* broker);
void truncate();
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
+
void destroy(qpid::broker::PersistableQueue& queue);
void create(const qpid::broker::PersistableExchange& queue,
const qpid::framing::FieldTable& args);
+
void destroy(const qpid::broker::PersistableExchange& queue);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
void create(const qpid::broker::PersistableConfig& config);
+
void destroy(const qpid::broker::PersistableConfig& config);
void recover(qpid::broker::RecoveryManager& queues);
void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
+
void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+
void flush(const qpid::broker::PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
-
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
@@ -239,10 +314,12 @@
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
{ return qpid::management::Manageable::STATUS_OK; }
- };
- }
-}
+ }; // class BdbMessageStore
+
+ } // namespace bdbstore
+} // namespace rhm
+
#endif
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -166,8 +166,7 @@
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
- i != prep_tx_list_ptr->end(); i++) {
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
prep_xid_list.push_back(i->xid);
}
@@ -347,9 +346,9 @@
}
void
-JournalImpl::dequeue_data_record(data_tok* const dtokp)
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_data_record(dtokp));
+ handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
@@ -359,9 +358,9 @@
}
void
-JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -156,9 +156,9 @@
void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- void dequeue_data_record(journal::data_tok* const dtokp);
+ void dequeue_data_record(journal::data_tok* const dtokp, const bool txn_coml_commit = false);
- void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
+ void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
Modified: store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -49,19 +49,12 @@
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
- throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
- "--store-directory must be present.");
+ throw Exception ("If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
options.storeDir = dataDir.getPath ();
}
- if (!store->init (&options))
- {
- throw Exception("Existing journal found in different bdb/async mode. "
- "Move or delete existing data files before changing modes, or use "
- "'--store-force yes' to discard existing data.");
- }
-
+ store->init(&options);
broker->setStore (store);
}
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -47,11 +47,11 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
-protected:
-
+ protected:
static qpid::sys::Mutex globalSerialiser;
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+ typedef ipqdef::iterator ipqItr;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
ipqdef impactedQueues; // list of Queues used in the txn
@@ -66,31 +66,34 @@
std::string tid;
DbTxn* txn;
- void completeTXN(bool commit) {
+ virtual void completeTxn(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->addRef();
- dtokp->set_external_rid(true);
- dtokp->set_rid(loggedtx->next());
- try {
- if (commit) {
- jc->txn_commit(dtokp.get(), getXid());
- jc->flush(true);
- } else {
- jc->txn_abort(dtokp.get(), getXid());
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ commitTxn(static_cast<JournalImpl*>(*i), commit);
+ }
+ impactedQueues.clear();
+ }
+
+ void commitTxn(JournalImpl* jc, bool commit) {
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(loggedtx->next());
+ try {
+ if (commit) {
+ jc->txn_commit(dtokp.get(), getXid());
+ jc->flush(true);
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
}
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
- deleteXidRecord();
}
-public:
+ public:
TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
if (loggedtx) {
@@ -119,22 +122,25 @@
if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop) jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
firstloop = false;
}
}
+ void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+ }
+
void begin(DbEnv& env, bool sync = false) {
env.txn_begin(0, &txn, 0);
if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
@@ -160,9 +166,8 @@
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
- void complete(bool commit) { completeTXN(commit); }
+ void complete(bool commit) { completeTxn(commit); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -175,11 +180,29 @@
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
{
+ protected:
const std::string xid;
-public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
- virtual bool isTPC() { return true; }
- virtual const std::string& getXid() { return xid; }
+ JournalImpl* preparedXidStorePtr;
+ virtual void completeTxn(bool commit) {
+ TxnCtxt::completeTxn(commit);
+ if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
+ }
+
+ public:
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
+ void sync() {
+ TxnCtxt::sync();
+ bool allWritten = false;
+ if (preparedXidStorePtr) {
+ while (!allWritten) {
+ allWritten = true;
+ sync_jrnl(preparedXidStorePtr, true, allWritten);
+ }
+ }
+ }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ inline virtual bool isTPC() { return true; }
+ inline virtual const std::string& getXid() { return xid; }
};
}}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -87,6 +87,7 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
u_int32_t _filler0; ///< Little-endian filler for 32-bit size_t
#endif
+ static const u_int16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
/**
* \brief Default constructor, which sets all values to 0.
@@ -105,7 +106,8 @@
* \brief Convenience constructor which initializes values during construction.
*/
inline deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
- const u_int64_t deq_rid, const std::size_t xidsize, const bool owi):
+ const u_int64_t deq_rid, const std::size_t xidsize, const bool owi,
+ const bool txn_coml_commit = false):
rec_hdr(magic, version, rid, owi), _deq_rid(deq_rid),
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
_filler0(0),
@@ -114,8 +116,17 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
, _filler0(0)
#endif
- {}
+ { set_txn_coml_commit(txn_coml_commit); }
+
+ inline bool is_txn_coml_commit() const { return _uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK; }
+
+ inline void set_txn_coml_commit(const bool commit)
+ {
+ _uflag = commit ? _uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+ _uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+ }
+
/**
* \brief Returns the size of the header in bytes.
*/
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -52,8 +52,8 @@
{}
deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi):
- _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi),
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
_xidp(xidp),
_buff(0),
_deq_tail(_deq_hdr)
@@ -68,6 +68,8 @@
deq_rec::reset()
{
_deq_hdr._rid = 0;
+ _deq_hdr.set_owi(false);
+ _deq_hdr.set_txn_coml_commit(false);
_deq_hdr._deq_rid = 0;
_deq_hdr._xidsize = 0;
_deq_tail._rid = 0;
@@ -77,10 +79,11 @@
void
deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi)
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
{
_deq_hdr._rid = rid;
_deq_hdr.set_owi(owi);
+ _deq_hdr.set_txn_coml_commit(txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_deq_tail._rid = rid;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -67,20 +67,21 @@
deq_rec();
// constructor used for write operations, where xid already exists
deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
virtual ~deq_rec();
// Prepare instance for use in reading data from journal
void reset();
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
u_int32_t decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks);
// Decode used for recover
bool rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs);
+ inline bool is_txn_coml_commit() const { return _deq_hdr.is_txn_coml_commit(); }
inline u_int64_t rid() const { return _deq_hdr._rid; }
inline u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
std::size_t get_xid(void** const xidpp);
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -338,25 +338,25 @@
}
iores
-jcntl::dequeue_data_record(data_tok* const dtokp)
+jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
return r;
}
}
iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
return r;
}
}
@@ -711,7 +711,8 @@
dr.get_xid(&xidp);
assert(xidp != 0);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false,
+ dr.is_txn_coml_commit()));
_tmap.set_aio_compl(xid, dr.rid());
std::free(xidp);
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -428,10 +428,13 @@
*
* \param dtokp Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_data_record(data_tok* const dtokp);
+ iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -446,10 +449,13 @@
* through journal.
* \param xid String containing xid. An empty string (i.e. length=0) will be considered
* non-transactional.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid);
+ iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -574,8 +580,7 @@
* \return <b><i>true</i></b> if the jouranl is ready to read and write data;
* <b><i>false</i></b> otherwise.
*/
- inline bool is_ready() const { return _init_flag and not _stop_flag; }
- inline bool is_init() const { return _init_flag; }
+ inline bool is_ready() const { return _init_flag && !_stop_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -602,7 +607,8 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -42,11 +42,12 @@
{
txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag):
+ const bool enq_flag, const bool commit_flag):
_rid(rid),
_drid(drid),
_fid(fid),
_enq_flag(enq_flag),
+ _commit_flag(commit_flag),
_aio_compl(false)
{}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -64,9 +64,10 @@
u_int64_t _drid; ///< Dequeue record id for this operation
u_int16_t _fid; ///< File id, to be used when transferring to emap on commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode
bool _aio_compl; ///< Initially false, set to true when record AIO returns
txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag);
+ const bool enq_flag, const bool commit_flag = false);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -256,7 +256,7 @@
}
iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
{
if (xid_len)
assert(xid_ptr != 0);
@@ -284,7 +284,7 @@
const bool ext_rid = dtokp->external_rid();
u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi());
+ _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
if (!cont)
{
if (!ext_rid)
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -110,7 +110,8 @@
iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external);
- iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
+ iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len,
+ const bool txn_coml_commit);
iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores flush();
Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,9 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -45,99 +45,171 @@
const char* tdp = getenv("TMPDIR");
const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+ public:
+ TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ }
+};
+
+// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+// reamining open transactions
+class TestMessageStore: public BdbMessageStore
+{
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TransactionContext> begin() {
+ checkInit();
+ // pass sequence number for c/a
+ return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+ }
+};
+
// === Helper fns ===
const string nameA("queueA");
const string nameB("queueB");
-const Uuid messageId(true);
+//const Uuid messageId(true);
std::auto_ptr<BdbMessageStore> store;
-QueueRegistry queues;
+std::auto_ptr<QueueRegistry> queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
+template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
+ FieldTable settings;
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
- FieldTable settings;
queueA->create(settings);
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
queueB->create(settings);
-
- //create message and enqueue it onto first queue:
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
-
- queueA->deliver(msg);
}
+template <class T>
void restart()
{
queueA.reset();
queueB.reset();
+ queues.reset();
store.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
-void check(bool swapped)
+boost::intrusive_ptr<Message> createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key);
+ msg->getProperties<MessageProperties>()->setCorrelationId(id);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+}
+
+void swap(bool commit)
+{
+ setup<BdbMessageStore>();
+
+ //create message and enqueue it onto first queue:
+ boost::intrusive_ptr<Message> msgA = createMessage("Message", "exchange", "routing_key");
+ queueA->deliver(msgA);
+
+ boost::intrusive_ptr<Message> msgB = queueA->dequeue().payload;
+ BOOST_REQUIRE(msgB);
+ //move the message from one queue to the other as a transaction
+ std::auto_ptr<TransactionContext> txn = store->begin();
+ queueB->enqueue(txn.get(), msgB);//note: need to enqueue it first to avoid message being deleted
+ queueA->dequeue(txn.get(), msgB);
+ if (commit) {
+ store->commit(*txn);
+ } else {
+ store->abort(*txn);
+ }
+
+ restart<BdbMessageStore>();
+
+ // Check outcome
BOOST_REQUIRE(queueA);
BOOST_REQUIRE(queueB);
- Queue::shared_ptr x;//the other queue
+ Queue::shared_ptr x;//the queue from which the message was swapped
Queue::shared_ptr y;//the queue on which the message is expected to be
- if (swapped) {
+ if (commit) {
x = queueA;
y = queueB;
} else {
x = queueB;
y = queueA;
}
-
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+
+ checkMsg(x, 0);
+ checkMsg(y, 1, "Message");
+ checkMsg(y, 0);
}
-void swap(bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
{
- setup();
+ setup<TestMessageStore>();
+ std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- //move the message from one queue to the other as a transaction
- std::auto_ptr<TransactionContext> txn = store->begin();
- queueB->enqueue(txn.get(), msg);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn.get(), msg);
- if (commit) {
- store->commit(*txn);
- } else {
+ //create two messages and enqueue them onto both queues:
+ boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgA);
+ queueB->enqueue(txn.get(), msgA);
+ boost::intrusive_ptr<Message> msgB = createMessage("MessageB", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgB);
+ queueB->enqueue(txn.get(), msgB);
+
+ static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+ if (commit)
+ store->commit(*txn);
+ else
store->abort(*txn);
+ restart<TestMessageStore>();
+
+ // Check outcome
+ if (commit)
+ {
+ checkMsg(queueA, 2, "MessageA");
+ checkMsg(queueB, 2, "MessageA");
+ checkMsg(queueA, 1, "MessageB");
+ checkMsg(queueB, 1, "MessageB");
}
-
- restart();
- check(commit);
+ checkMsg(queueA, 0);
+ checkMsg(queueB, 0);
}
-
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
@@ -153,5 +225,47 @@
swap(false);
cout << "ok" << endl;
}
+/*
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ testMultiQueueTxn(0, true);
+ cout << "ok" << endl;
+}
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ testMultiQueueTxn(0, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ testMultiQueueTxn(1, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ testMultiQueueTxn(1, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ testMultiQueueTxn(2, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ testMultiQueueTxn(2, false);
+ cout << "ok" << endl;
+}
+*/
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,11 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "TxnCtxt.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -67,8 +69,8 @@
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
void init(){ msg = test->deliver(messageId, test->queueA); }
- void run(TPCTransactionContext* txn) { test->swap(txn); }
- void check(bool committed) { test->swapCheck(committed, messageId); }
+ void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+ void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); }
};
class Enqueue : public Strategy
@@ -81,17 +83,17 @@
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
void run(TPCTransactionContext* txn) {
- msg1 = test->enqueue(txn, "Enqueue1");
- msg2 = test->enqueue(txn, "Enqueue2");
- msg3 = test->enqueue(txn, "Enqueue3");
+ msg1 = test->enqueue(txn, "Enqueue1", test->queueA);
+ msg2 = test->enqueue(txn, "Enqueue2", test->queueA);
+ msg3 = test->enqueue(txn, "Enqueue3", test->queueA);
}
void check(bool committed) {
if (committed) {
- test->checkA(3, "Enqueue1");
- test->checkA(2, "Enqueue2");
- test->checkA(1, "Enqueue3");
+ test->checkMsg(test->queueA, 3, "Enqueue1");
+ test->checkMsg(test->queueA, 2, "Enqueue2");
+ test->checkMsg(test->queueA, 1, "Enqueue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
@@ -109,26 +111,94 @@
msg3 = test->deliver("Dequeue3", test->queueA);
}
void run(TPCTransactionContext* txn) {
- test->dequeue(txn);
- test->dequeue(txn);
- test->dequeue(txn);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
}
void check(bool committed) {
if (!committed) {
- test->checkA(3, "Dequeue1");
- test->checkA(2, "Dequeue2");
- test->checkA(1, "Dequeue3");
+ test->checkMsg(test->queueA, 3, "Dequeue1");
+ test->checkMsg(test->queueA, 2, "Dequeue2");
+ test->checkMsg(test->queueA, 1, "Dequeue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
+ class MultiQueueTxn : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ std::set<Queue::shared_ptr> queueset;
+ public:
+ MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+ virtual void init() {}
+ virtual void run(TPCTransactionContext* txn) {
+ queueset.insert(test->queueA);
+ queueset.insert(test->queueB);
+ msg1 = test->enqueue(txn, "Message1", queueset);
+ msg2 = test->enqueue(txn, "Message2", queueset);
+ queueset.clear();
+ }
+ virtual void check(bool committed) {
+ TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+ if (committed)
+ {
+ test->checkMsg(test->queueA, 2, "Message1");
+ test->checkMsg(test->queueB, 2, "Message1");
+ test->checkMsg(test->queueA, 1, "Message2");
+ test->checkMsg(test->queueB, 1, "Message2");
+ }
+ test->checkMsg(test->queueA, 0);
+ test->checkMsg(test->queueB, 0);
+ // Check there are no remaining open txns in store
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
+ }
+ };
+
+ // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+ class TestTPCTxnCtxt : public TPCTxnCtxt
+ {
+ public:
+ TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ // If prepared list is not to be committed, set pointer to 0
+ if (!complete_prepared_list) preparedXidStorePtr = 0;
+ }
+ };
+
+ // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+ // reamining open transactions
+ class TestMessageStore: public BdbMessageStore
+ {
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
+ checkInit();
+ IdSequence* jtx = &messageIdSequence;
+ // pass sequence number for c/a
+ return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+ }
+ u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+ return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+ }
+ u_int32_t getRemainingPreparedListTxns() {
+ return preparedXidStorePtr->get_open_txn_cnt();
+ }
+ };
+
const string nameA;
const string nameB;
std::auto_ptr<BdbMessageStore> store;
std::auto_ptr<DtxManager> dtxmgr;
- QueueRegistry queues;
- LinkRegistry links;
+ std::auto_ptr<QueueRegistry> queues;
+ std::auto_ptr<LinkRegistry> links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
boost::intrusive_ptr<Message> msg1;
@@ -137,14 +207,14 @@
void recoverPrepared(bool commit)
{
- setup();
+ setup<BdbMessageStore>();
Swap swap(this, "RecoverPrepared");
swap.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
swap.run(txn.get());
store->prepare(*txn);
- restart();
+ restart<BdbMessageStore>();
//check that the message is not available from either queue
BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
@@ -158,58 +228,89 @@
}
swap.check(commit);
- restart();
+ restart<BdbMessageStore>();
swap.check(commit);
}
+
+ void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+ {
+ setup<TestMessageStore>();
+ MultiQueueTxn mqtTest(this);
+ mqtTest.init();
+ std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+ mqtTest.run(txn.get());
+ store->prepare(*txn);
+ // As the commits and aborts should happen through DtxManager, and it is too complex to
+ // pass all these test params through, we bypass DtxManager and use the store directly.
+ // This will prevent the queues from seeing committed txns, however. To test the success
+ // or failure of
+ static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+ if (commit)
+ store->commit(*txn);
+ else
+ store->abort(*txn);
+ restart<TestMessageStore>();
+ mqtTest.check(commit);
+ }
+
void commit(Strategy& strategy)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
store->prepare(*txn);
- store->commit(*txn);
- restart();
+ store->commit(*txn);
+ restart<BdbMessageStore>();
strategy.check(true);
}
void abort(Strategy& strategy, bool prepare)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
if (prepare) store->prepare(*txn);
store->abort(*txn);
- restart();
+ restart<BdbMessageStore>();
strategy.check(false);
}
- void swap(TPCTransactionContext* txn)
+ void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
{
- msg1 = queueA->dequeue().payload;//just dequeues in memory
+ msg1 = from->dequeue().payload;//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn, msg1);
+ to->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+ from->dequeue(txn, msg1);
}
- void dequeue(TPCTransactionContext* txn)
+ void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
{
- msg2 = queueA->dequeue().payload;//just dequeues in memory
- queueA->dequeue(txn, msg2);
+ msg2 = queue->dequeue().payload;//just dequeues in memory
+ queue->dequeue(txn, msg2);
}
- boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid)
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, Queue::shared_ptr& queue)
{
- boost::intrusive_ptr<Message> msg = createMessage(msgid);
- queueA->enqueue(txn, msg);
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ queue->enqueue(txn, msg);
return msg;
}
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+ {
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+ (*i)->enqueue(txn, msg);
+ }
+ return msg;
+ }
+
boost::intrusive_ptr<Message> deliver(const string& msgid, Queue::shared_ptr& queue)
{
msg4 = createMessage(msgid);
@@ -217,9 +318,10 @@
return msg4;
}
+ template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
@@ -239,60 +341,63 @@
return msg;
}
+ template <class T>
void restart()
{
queueA.reset();
queueB.reset();
store.reset();
+ queues.reset();
+ links.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+ links = std::auto_ptr<LinkRegistry>(new LinkRegistry(0));
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
- void swapCheck(bool swapped, const string& msgid)
+ void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
- BOOST_REQUIRE(queueA);
- BOOST_REQUIRE(queueB);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+ }
+
+ void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+ {
+ BOOST_REQUIRE(from);
+ BOOST_REQUIRE(to);
- Queue::shared_ptr x;//the other queue
- Queue::shared_ptr y;//the queue on which the message is expected to be
+ Queue::shared_ptr x; //the queue from which the message was swapped
+ Queue::shared_ptr y; //the queue on which the message is expected to be
if (swapped) {
- x = queueA;
- y = queueB;
+ x = from;
+ y = to;
} else {
- x = queueB;
- y = queueA;
+ x = to;
+ y = from;
}
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ checkMsg(x, 0);
+ checkMsg(y, 1, msgid);
+ checkMsg(y, 0);
}
- void checkA(u_int32_t size, const string& msgid = "<none>")
- {
- BOOST_REQUIRE(queueA);
- BOOST_CHECK_EQUAL(size, queueA->getMessageCount());
- if (size > 0) {
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
- }
- }
-
public:
- TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
+ TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
void testCommitEnqueue()
{
@@ -357,6 +462,46 @@
{
recoverPrepared(false);
}
+
+ void testMultiQueueCommit()
+ {
+ testMultiQueueTxn(2, true, true);
+ }
+
+ void testMultiQueueAbort()
+ {
+ testMultiQueueTxn(2, true, false);
+ }
+
+ void testMultiQueueNoQueueCommitRecover()
+ {
+ testMultiQueueTxn(0, false, true);
+ }
+
+ void testMultiQueueNoQueueAbortRecover()
+ {
+ testMultiQueueTxn(0, false, false);
+ }
+
+ void testMultiQueueSomeQueueCommitRecover()
+ {
+ testMultiQueueTxn(1, false, true);
+ }
+
+ void testMultiQueueSomeQueueAbortRecover()
+ {
+ testMultiQueueTxn(1, false, false);
+ }
+
+ void testMultiQueueAllQueueCommitRecover()
+ {
+ testMultiQueueTxn(2, false, true);
+ }
+
+ void testMultiQueueAllQueueAbortRecover()
+ {
+ testMultiQueueTxn(2, false, false);
+ }
};
TwoPhaseCommitTest tpct;
@@ -440,4 +585,60 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ tpct.testMultiQueueCommit();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ tpct.testMultiQueueAbort();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ tpct.testMultiQueueNoQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ tpct.testMultiQueueNoQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ tpct.testMultiQueueSomeQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ tpct.testMultiQueueSomeQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ tpct.testMultiQueueAllQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ tpct.testMultiQueueAllQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_SUITE_END()
16 years, 5 months
rhmessaging commits: r2197 - store/branches/mrg-1.0/cpp.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 10:53:45 -0400 (Mon, 14 Jul 2008)
New Revision: 2197
Modified:
store/branches/mrg-1.0/cpp/README
Log:
Backport of trunk r.2185-2186: minior updates to README
Modified: store/branches/mrg-1.0/cpp/README
===================================================================
--- store/branches/mrg-1.0/cpp/README 2008-07-14 14:42:52 UTC (rev 2196)
+++ store/branches/mrg-1.0/cpp/README 2008-07-14 14:53:45 UTC (rev 2197)
@@ -1,4 +1,4 @@
-== MessageStore plugin for qpid c++ broker using Berkeley DB ==
+== MessageStore plugin for qpid c++ broker ==
Pre-requisites:
- qpid broker headers and libraries (see below)
@@ -28,6 +28,6 @@
On a Debian-based system, install prerequisites via:
apt-get install libdb4.3++-dev
-On Debian, I need to get this file,
+On Debian, I need to get this file:
/usr/include/db_cxx.h
16 years, 5 months
rhmessaging commits: r2196 - store/branches/mrg-1.0/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 10:42:52 -0400 (Mon, 14 Jul 2008)
New Revision: 2196
Modified:
store/branches/mrg-1.0/cpp/tests/system_test.sh
Log:
Backport of trunk r.2178: Fixed script so that system test will fail if broker fails to start.
Modified: store/branches/mrg-1.0/cpp/tests/system_test.sh
===================================================================
--- store/branches/mrg-1.0/cpp/tests/system_test.sh 2008-07-14 14:23:12 UTC (rev 2195)
+++ store/branches/mrg-1.0/cpp/tests/system_test.sh 2008-07-14 14:42:52 UTC (rev 2196)
@@ -47,13 +47,13 @@
BROKER_OPTS="--no-module-dir --load-module=$LIBBDBSTORE --data-dir=$TMPDIR --auth=no --wcache-page-size 16"
run_tests() {
for p in `seq 1 8`; do
- $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || return 1
+ $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start"; return 1; }
python "$abs_srcdir/persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1;
$abs_srcdir/stop_broker
done
}
echo 'Journal (AIO) persistence...'
-run_tests
+run_tests || fail=1
exit $fail
16 years, 5 months
rhmessaging commits: r2195 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 10:23:12 -0400 (Mon, 14 Jul 2008)
New Revision: 2195
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
store/branches/mrg-1.0/cpp/tests/persistence.py
Log:
Backport of trunk r.2177: Fixed problems with transaction recover: DataToken in TxnCtxt was not being restored; also highest rid found during restore was not taking account of the new preparedXid instance which shares the messageIdSequence. Other minor bugfixes and tidy-ups.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -29,6 +29,9 @@
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
using namespace rhm::bdbstore;
using namespace qpid::broker;
using boost::static_pointer_cast;
@@ -58,6 +61,7 @@
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
isInit(false),
envPath(envpath)
@@ -460,10 +464,18 @@
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
}
+
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+
+ // Restore data token state in TxnCtxt
+ xid_rid_map_citr citr = preparedMap.find(i->xid);
+ if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
+ tpcc->recoverDtok(citr->second, i->xid);
+ tpcc->addXidRecord(preparedXidStorePtr.get());
+
RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
@@ -492,7 +504,6 @@
IdDbt key;
Dbt value;
//read all queues
- u_int64_t highestRid = 0;
while (queues.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Queue instance
@@ -524,7 +535,11 @@
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
+
+ // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
+ // the messageIdSequence is used for both queue journals and the preparedXid journal.
messageIdSequence.reset(highestRid + 1);
+
queueIdSequence.reset(maxQueueId + 1);
}
@@ -605,9 +620,6 @@
}
-#define MAX_AIO_SLEEPS 1000 // ~1 second
-#define AIO_SLEEP_TIME 1000 // 1 milisecond
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -768,19 +780,6 @@
}
}
-void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- Dbt ignore;
- while (c.next(key, ignore)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- xids.insert(xid);
- }
-}
-
void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
{
Cursor c;
@@ -798,15 +797,50 @@
{
if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
{
- u_int64_t highest_rid;
+ u_int64_t thisHighestRid;
preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, highest_rid, 0);
-
- std::vector<std::string> xv;
- preparedXidStorePtr->get_open_txn_list(xv);
- for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
- xids.insert(*itr);
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ try {
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ DataTokenImpl dtokp;
+ bool done = false;
+ long aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
+ switch (res) {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ if (xidbuffSize > 0) {
+ xids.insert(std::string((const char*)xidbuff, xidbuffSize));
+ preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
+ ::free(xidbuff);
+ } else {
+ THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
+ }
+ aio_sleep_cnt = 0;
+ break;
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
+ }
preparedXidStorePtr->recover_complete(); // start journal.
}
@@ -1146,9 +1180,13 @@
try {
// Nothing to do if not prepared
chkInitPreparedXidStore();
- if (txn.getDtok().is_enqueued())
- preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
-
+ if (txn.getDtok()->is_enqueued()) {
+ txn.incrDtokRef();
+ DataTokenImpl* dtokp = txn.getDtok();
+ dtokp->set_dequeue_rid(dtokp->rid());
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+ }
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
@@ -1160,19 +1198,15 @@
{
checkInit();
// pass sequence number for c/a
- TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- return auto_ptr<TransactionContext>(txn);
+ return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
}
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
{
checkInit();
- IdSequence* jtx = NULL;
- jtx = &messageIdSequence;
-
+ IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
- TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- return auto_ptr<TPCTransactionContext>(txn);
+ return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
}
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
@@ -1183,7 +1217,11 @@
try {
chkInitPreparedXidStore();
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+ txn->incrDtokRef();
+ DataTokenImpl* dtokp = txn->getDtok();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 14:23:12 UTC (rev 2195)
@@ -25,29 +25,19 @@
#define _BdbMessageStore_
#include <string>
+
#include "db-inc.h"
-//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-//#include "IdSequence.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jcfg.hpp"
#include "PreparedTransaction.h"
-//#include "StoreException.h"
-#include "TxnCtxt.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/management/Manageable.h"
-//#include <qpid/sys/Monitor.h>
-//#include <qpid/sys/Time.h>
-//#include <map>
-//#include <set>
-//#include <iostream>
-//#include <boost/format.hpp>
-//#include <boost/intrusive_ptr.hpp>
-//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
-#include "jrnl/jcfg.hpp"
-#include "JournalImpl.h"
-#include "IdSequence.h"
+#include "TxnCtxt.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -57,7 +47,6 @@
namespace rhm {
namespace bdbstore {
- using std::string;
/**
* An implementation of the MessageStore interface based on Berkeley DB
@@ -71,6 +60,9 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
+ typedef std::map<std::string, u_int64_t> xid_rid_map;
+ typedef xid_rid_map::const_iterator xid_rid_map_citr;
+
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
@@ -98,6 +90,8 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
+ xid_rid_map preparedMap;
+ u_int64_t highestRid;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -121,7 +115,6 @@
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked, message_index& prepared);
void recoverXids(txn_list& txns);
- void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
@@ -152,21 +145,34 @@
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- string getJrnlBaseDir();
- string getBdbBaseDir();
- string getPxidBaseDir();
+ std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ std::string getJrnlDir(const char* queueName);
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
void chkInitPreparedXidStore();
+
+ // debug aid for printing XIDs that may contain non-printable chars
+ static std::string xid2str(const std::string xid) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (unsigned i=0; i<xid.size(); i++) {
+ if (isprint(xid[i]))
+ oss << xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)xid[i]);
+ }
+ return oss.str();
+ }
public:
struct Options : public qpid::Options {
Options(const std::string& name="Store Options");
- string clusterName;
- string storeDir;
+ std::string clusterName;
+ std::string storeDir;
bool storeAsync;
bool storeForce;
uint16_t numJrnlFiles;
@@ -222,7 +228,7 @@
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<string>& xids);
+ void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 14:23:12 UTC (rev 2195)
@@ -57,7 +57,7 @@
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
- DataTokenImpl dtok;
+ boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
/**
@@ -83,7 +83,6 @@
jc->txn_abort(dtokp.get(), getXid());
}
} catch (const journal::jexception& e) {
- //std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
@@ -93,7 +92,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -129,7 +128,6 @@
jc->get_wr_events();
}
} catch (const journal::jexception& e) {
- //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
}
}
@@ -165,7 +163,14 @@
void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
void complete(bool commit) { completeTXN(commit); }
- DataTokenImpl& getDtok() { return dtok; }
+ DataTokenImpl* getDtok() { return dtokp.get(); }
+ void incrDtokRef() { dtokp->addRef(); }
+ void recoverDtok(const u_int64_t rid, const std::string xid) {
+ dtokp->set_rid(rid);
+ dtokp->set_wstate(DataTokenImpl::ENQ);
+ dtokp->set_xid(xid);
+ dtokp->set_external_rid(true);
+ }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -30,6 +30,7 @@
#include "jrnl/data_tok.hpp"
+#include <iomanip>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
#include <sstream>
@@ -50,6 +51,7 @@
_dblks_written(0),
_dblks_read(0),
_pg_cnt(0),
+ _fid(0),
_rid(0),
_xid(),
_dequeue_rid(0),
@@ -163,9 +165,31 @@
_dblks_written = 0;
_dblks_read = 0;
_pg_cnt = 0;
+ _fid = 0;
_rid = 0;
_xid.clear();
}
+// debug aid
+std::string
+data_tok::status_str() const
+{
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok id=0x" << _cnt << "; ws=" << wstate_str() << "; rs=" << rstate_str();
+ oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+ for (unsigned i=0; i<_xid.size(); i++)
+ {
+ if (isprint(_xid[i]))
+ oss << _xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+ }
+ oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+ oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read;
+ oss << " pc=0x" << _pg_cnt;
+ return oss.str();
+}
+
} // namespace journal
} // namespace rhm
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -160,6 +160,9 @@
{ _xid.assign((const char*)xidp, xid_len); }
void reset();
+
+ // debug aid
+ std::string status_str() const;
};
} // namespace journal
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -97,7 +97,7 @@
return _fh_arr[pg_index];
}
-const std::string
+std::string
rrfc::status_str() const
{
std::ostringstream oss;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -136,7 +136,7 @@
{ return _curr_fh->wr_aio_outstanding_dblks() > 0; }
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class rrfc
} // namespace journal
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -91,11 +91,8 @@
const txn_data_list
txn_map::get_tdata_list(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -127,11 +124,8 @@
bool
txn_map::in_map(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr= _map.find(xid);
if (itr == _map.end()) // not found in map
return false;
return true;
@@ -140,11 +134,8 @@
u_int32_t
txn_map::get_rid_count(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -176,7 +176,7 @@
return findex != _fh_index && in_use;
}
-const std::string
+std::string
wrfc::status_str() const
{
std::ostringstream oss;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -128,7 +128,7 @@
bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class wrfc
} // namespace journal
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 14:23:12 UTC (rev 2195)
@@ -160,9 +160,6 @@
swap.check(commit);
restart();
swap.check(commit);
-
- // this test leaves xids in the store
- store->truncate();
}
void commit(Strategy& strategy)
Modified: store/branches/mrg-1.0/cpp/tests/persistence.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/persistence.py 2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/tests/persistence.py 2008-07-14 14:23:12 UTC (rev 2195)
@@ -262,6 +262,19 @@
session = self.session
session.synchronous = False
+ # check xids from phase 6 are gone
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ if txd.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ self.assertEqual(0, len(xids))
+
#test deletion of queue after publish
#create queue
session.queue_declare(queue = "q", auto_delete=True, durable=True)
16 years, 5 months
rhmessaging commits: r2194 - store/branches/mrg-1.0/specs.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 09:59:22 -0400 (Mon, 14 Jul 2008)
New Revision: 2194
Modified:
store/branches/mrg-1.0/specs/management-schema.xml
Log:
Backport of trunk r.2174: Leftover from removal of sync option in store: removed async property from management of store class
Modified: store/branches/mrg-1.0/specs/management-schema.xml
===================================================================
--- store/branches/mrg-1.0/specs/management-schema.xml 2008-07-14 13:50:09 UTC (rev 2193)
+++ store/branches/mrg-1.0/specs/management-schema.xml 2008-07-14 13:59:22 UTC (rev 2194)
@@ -6,7 +6,6 @@
<class name="Store">
<property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/>
<property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
- <property name="async" type="bool" access="RO" desc="Asynchronous IO"/>
<property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
<property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/>
</class>
16 years, 5 months
rhmessaging commits: r2193 - in store/branches/mrg-1.0/cpp: lib/jrnl and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 09:50:09 -0400 (Mon, 14 Jul 2008)
New Revision: 2193
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/JournalImpl.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Backport of trunk r.2173: Moved prepared XID list from BDB to a journal instance in BdbMessageStore.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -22,20 +22,11 @@
*/
#include "BdbMessageStore.h"
-#include <qpid/broker/RecoveryManager.h>
-#include <qpid/broker/Message.h>
-#include <qpid/framing/Buffer.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Mutex.h>
-#include <algorithm>
-#include <iomanip>
-#include <sstream>
+
#include "BindingDbt.h"
+#include "BufferValue.h"
#include "IdPairDbt.h"
-#include "StringDbt.h"
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
using namespace rhm::bdbstore;
@@ -63,9 +54,6 @@
mappingDb(&env, 0),
bindingDb(&env, 0),
generalDb(&env, 0),
- enqueueXidDb(&env, 0),
- dequeueXidDb(&env, 0),
- prepareXidDb(&env, 0),
numJrnlFiles(defNumJrnlFiles),
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
@@ -125,12 +113,11 @@
if (dir.size()>0) storeDir = dir;
- string bdbdir = storeDir + "/rhm/dat/";
- journal::jdir::create_dir(bdbdir);
+ journal::jdir::create_dir(getBdbBaseDir());
+ journal::jdir::create_dir(getPxidBaseDir());
-
try {
- env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -150,10 +137,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
- open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
- open(prepareXidDb, txn.get(), "prepare_xid.db", false);
+ preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
+ } catch (const journal::jexception& e) {
+ txn.abort();
+ THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -167,6 +155,15 @@
return true;
}
+void BdbMessageStore::chkInitPreparedXidStore()
+{
+ if (!preparedXidStorePtr->is_init())
+ {
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+ }
+}
+
bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
@@ -253,11 +250,16 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
+ if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
} catch (const DbException& e) {
- QPID_LOG(error, "Error closing databases: " << e.what());
+ QPID_LOG(error, "Error closing BDB databases: " << e.what());
+ } catch (const journal::jexception& e) {
+ QPID_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- } catch (...) {}
+ QPID_LOG(error, "Error: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "Unknown error in BdbMessageStore::~BdbMessageStore()");
+ }
if (mgmtObject.get() != 0)
mgmtObject->resourceDestroy();
@@ -276,6 +278,7 @@
txn->commit(0);
try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ journal::jdir::delete_dir(getPxidBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -508,7 +511,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -601,6 +604,10 @@
generalIdSequence.reset(maxGeneralId + 1);
}
+
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -678,7 +685,7 @@
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
::usleep(AIO_SLEEP_TIME);
break;
case rhm::journal::RHM_IORES_EMPTY:
@@ -748,11 +755,11 @@
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> prepared;
- collectPreparedXids(prepared);
+ std::set<string> preparedXidSet;
+ collectPreparedXids(preparedXidSet);
- //when using the async journal, it will abort unprepaired xids and populate the locked maps
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ // Abort unprepaired xids and populate the locked maps
+ for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -789,7 +796,20 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- readXids(prepareXidDb, xids);
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
+ {
+ u_int64_t highest_rid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+ 0, highest_rid, 0);
+
+ std::vector<std::string> xv;
+ preparedXidStorePtr->get_open_txn_list(xv);
+ for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
+ xids.insert(*itr);
+
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
}
void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
@@ -961,26 +981,18 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
- bool newId = false;
- if (messageId == 0) {
- messageId = messageIdSequence.next();
- msg->setPersistenceId(messageId);
- newId = true;
- }
- store(&queue, txn, key, msg, newId);
+ bool newId = false;
+ if (messageId == 0) {
+ messageId = messageIdSequence.next();
+ msg->setPersistenceId(messageId);
+ newId = true;
+ }
+ store(&queue, txn, key, msg, newId);
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-
- if (!ctxt) txn->commit();
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
void BdbMessageStore::store(const PersistableQueue* queue,
@@ -1054,25 +1066,13 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
-
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
-
- if (!ctxt) txn->commit();
-
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ msg->dequeueComplete();
}
void BdbMessageStore::async_dequeue(
@@ -1143,17 +1143,15 @@
void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env);
-
try {
+ // Nothing to do if not prepared
+ chkInitPreparedXidStore();
+ if (txn.getDtok().is_enqueued())
+ preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
- StringDbt key(txn.getXid());
- prepareXidDb.del(txn.get(), &key, 0);
-
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
- txn.abort();
throw;
}
}
@@ -1161,9 +1159,8 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1173,9 +1170,8 @@
IdSequence* jtx = NULL;
jtx = &messageIdSequence;
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1186,18 +1182,14 @@
if(!txn) throw InvalidTransactionContextException();
try {
- u_int8_t dummy(1);
- string xid(txn->getXid());
- Dbt key ((void*) xid.data(), xid.length());
- Dbt value(&dummy, sizeof(dummy));
+ chkInitPreparedXidStore();
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+ txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
- prepareXidDb.put(txn->get(), &key, &value, 0);
-
- txn->commit();
} catch (const std::exception& e) {
- txn->abort();
+ QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
throw;
}
}
@@ -1209,7 +1201,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
- txn->commit();
+ txn->complete(true);
}
}
@@ -1220,7 +1212,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
- txn->abort();
+ txn->complete(false);
}
}
@@ -1326,10 +1318,24 @@
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
- dir << storeDir<< "/rhm/jrnl/" ;
+ dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
+string BdbMessageStore::getBdbBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/dat/" ;
+ return dir.str();
+}
+
+string BdbMessageStore::getPxidBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/pxid/" ;
+ return dir.str();
+}
+
string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
@@ -1370,5 +1376,3 @@
"Lower values decrease latency at the expense of throughput.")
;
}
-
-
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 13:50:09 UTC (rev 2193)
@@ -24,26 +24,30 @@
#ifndef _BdbMessageStore_
#define _BdbMessageStore_
+#include <string>
#include "db-inc.h"
-#include "BufferValue.h"
+//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-#include "IdSequence.h"
+//#include "IdSequence.h"
#include "PreparedTransaction.h"
-#include "StoreException.h"
+//#include "StoreException.h"
#include "TxnCtxt.h"
-#include <qpid/broker/Broker.h>
-#include <qpid/broker/MessageStore.h>
-#include <qpid/management/Manageable.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/sys/Time.h>
-#include <map>
-#include <set>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/ptr_container/ptr_list.hpp>
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/management/Manageable.h"
+//#include <qpid/sys/Monitor.h>
+//#include <qpid/sys/Time.h>
+//#include <map>
+//#include <set>
+//#include <iostream>
+//#include <boost/format.hpp>
+//#include <boost/intrusive_ptr.hpp>
+//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
+#include "jrnl/jcfg.hpp"
+#include "JournalImpl.h"
+#include "IdSequence.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -68,9 +72,12 @@
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Default store settings
- static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
- static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defXidStoreNumJrnlFiles = 8;
+ static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+ static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
std::list<Db*> dbs;
DbEnv env;
@@ -81,9 +88,7 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
- Db enqueueXidDb;
- Db dequeueXidDb;
- Db prepareXidDb;
+ boost::shared_ptr<JournalImpl> preparedXidStorePtr;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -150,9 +155,12 @@
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
string getJrnlBaseDir();
+ string getBdbBaseDir();
+ string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
+ void chkInitPreparedXidStore();
public:
struct Options : public qpid::Options {
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -152,7 +152,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
{
@@ -162,34 +162,42 @@
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
- // Create list of prepared xids
- std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- prep_xid_list.push_back(i->xid);
+
+ if (prep_tx_list_ptr) {
+ // Create list of prepared xids
+ std::vector<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
+ i != prep_tx_list_ptr->end(); i++) {
+ prep_xid_list.push_back(i->xid);
+ }
+
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+ &prep_xid_list, highest_rid);
+ } else {
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+ 0, highest_rid);
}
-
- jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
- prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- try {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
+ if (prep_tx_list_ptr)
+ {
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+ try {
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
+ }
}
}
+ catch (const jexception& e) {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
- catch (const jexception& e) {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
}
std::ostringstream oss2;
oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 13:50:09 UTC (rev 2193)
@@ -119,7 +119,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id);
@@ -127,11 +127,11 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id) {
recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, 0,
- &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+ &aio_wr_callback, prep_tx_list_ptr, highest_rid, queue_id);
}
void recover_complete();
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 13:50:09 UTC (rev 2193)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -24,38 +24,40 @@
#ifndef _TxnCtxt_
#define _TxnCtxt_
-#include "db-inc.h"
-#include <qpid/broker/MessageStore.h>
-#include <qpid/sys/Mutex.h>
-#include <boost/shared_ptr.hpp>
-#include <sstream>
-#include <memory>
-#include <vector>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <jrnl/jexception.hpp>
+#include <db-inc.h>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <unistd.h> // ::usleep()
+#include "DataTokenImpl.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jexception.hpp"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "StoreException.h"
+
namespace rhm{
namespace bdbstore{
-// find a better place to put these
-#define MAX_AIO_SLEEPS 1000
-#define AIO_SLEEP_TIME 1000
-
-
class TxnCtxt : public qpid::broker::TransactionContext
{
protected:
+
+ static qpid::sys::Mutex globalSerialiser;
+
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
- static qpid::sys::Mutex globalSerialiser;
-
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
+ DataTokenImpl dtok;
AutoScopedLock globalHolder;
/**
@@ -63,67 +65,70 @@
*/
std::string tid;
DbTxn* txn;
-
- void completeTXN(bool commit){
+
+ void completeTXN(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->set_external_rid(true);
dtokp->set_rid(loggedtx->next());
- try{
+ try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
jc->flush(true);
} else {
jc->txn_abort(dtokp.get(), getXid());
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
//std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
-
}
- }
+ }
deleteXidRecord();
}
-
+
public:
-
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
- if (loggedtx){
- std::stringstream s;
- s << "rhm-tid" << this;
- tid.assign(s.str());
- }
+
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ if (loggedtx) {
+ std::stringstream s;
+ s << "rhm-tid" << this;
+ tid.assign(s.str());
+ }
}
-
+
/**
* Call to make sure all the data for this txn is written to safe store
*
*@return if the data sucessfully synced.
- */
- void sync(){
+ */
+
+ virtual ~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 1000 // ~1 second
+#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+
+ void sync() {
bool allWritten = false;
bool firstloop = true;
- while (loggedtx && !allWritten){
- if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
+ long sleep_cnt = 0L;
+ while (loggedtx && !allWritten) {
+ if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
+ if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
-
- try
- {
- if (jc && !(jc->is_txn_synced(getXid())))
- {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
}
@@ -131,34 +136,36 @@
firstloop = false;
}
}
-
- virtual ~TxnCtxt() { if(txn) abort(); }
- void begin(DbEnv& env, bool sync = false){
- env.txn_begin(0, &txn, 0);
- if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+
+ void begin(DbEnv& env, bool sync = false) {
+ env.txn_begin(0, &txn, 0);
+ if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
}
- void commit(){
- txn->commit(0);
- txn = 0;
- completeTXN(true);
- globalHolder.reset();
+
+ void commit() {
+ if (txn) {
+ txn->commit(0);
+ txn = 0;
+ globalHolder.reset();
+ }
}
- void abort(){
+
+ void abort(){
if (txn) {
- txn->abort();
- txn = 0;
- completeTXN(false);
- globalHolder.reset();
- }
+ txn->abort();
+ txn = 0;
+ globalHolder.reset();
+ }
}
- DbTxn* get(){ return txn; }
+
+ DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
- impactedQueues.insert(queue); }
-
+ void deleteXidRecord() { impactedQueues.clear(); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ void complete(bool commit) { completeTXN(commit); }
+ DataTokenImpl& getDtok() { return dtok; }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -168,12 +175,6 @@
TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
- // commit the BDB abort, abort commit the jnrl
- void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
- void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
- void complete(bool commit){
- txn->commit(0); completeTXN(commit); txn = 0;
- }
};
}}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -155,7 +155,7 @@
jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid)
{
_init_flag = false;
_stop_flag = false;
@@ -187,7 +187,7 @@
_jdir.verify_dir();
_rcvdat.reset(_num_jfiles);
- rcvr_janalyze(_rcvdat, prep_txn_list);
+ rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
@@ -574,7 +574,7 @@
}
void
-jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
@@ -633,16 +633,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- // Remove all transactions not in prep_txn_list
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
+ if (!rd._empty && prep_txn_list_ptr)
{
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+ if (pitr == prep_txn_list_ptr->end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -210,7 +210,7 @@
* \param wcache_pgsize_sblks The size in sblks of each write cache page.
* \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
- * \param prep_txn_list
+ * \param prep_txn_list_ptr
* \param highest_rid Returns the highest rid found in the journal during recover
*
* \exception TODO
@@ -218,7 +218,7 @@
void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal operation
@@ -575,6 +575,7 @@
* <b><i>false</i></b> otherwise.
*/
inline bool is_ready() const { return _init_flag and not _stop_flag; }
+ inline bool is_init() const { return _init_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -599,6 +600,9 @@
inline u_int16_t num_jfiles() const { return _num_jfiles; }
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
+ inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+ void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
@@ -644,7 +648,7 @@
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
+ void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd);
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -967,8 +967,8 @@
else
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " index=" << _pg_index << " state=";
- oss << _page_cb_arr[_pg_index].state_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str();
throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check");
}
}
@@ -988,8 +988,8 @@
if (!dtokp->is_writable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
@@ -999,8 +999,8 @@
if (!dtokp->is_dequeueable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -160,6 +160,9 @@
swap.check(commit);
restart();
swap.check(commit);
+
+ // this test leaves xids in the store
+ store->truncate();
}
void commit(Strategy& strategy)
@@ -294,52 +297,46 @@
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
- void testCommitSwap()
+ void testCommitEnqueue()
{
- Swap swap(this, "SwapMessageId");
- commit(swap);
+ Enqueue enqueue(this);
+ commit(enqueue);
}
- void testPrepareAndAbortSwap()
+ void testCommitDequeue()
{
- Swap swap(this, "SwapMessageId");
- abort(swap, true);
+ Dequeue dequeue(this);
+ commit(dequeue);
}
- void testAbortNoPrepareSwap()
+ void testCommitSwap()
{
Swap swap(this, "SwapMessageId");
- abort(swap, false);
+ commit(swap);
}
- void testCommitEnqueue()
- {
- Enqueue enqueue(this);
- commit(enqueue);
- }
-
void testPrepareAndAbortEnqueue()
{
Enqueue enqueue(this);
abort(enqueue, true);
}
- void testAbortNoPrepareEnqueue()
+ void testPrepareAndAbortDequeue()
{
- Enqueue enqueue(this);
- abort(enqueue, false);
+ Dequeue dequeue(this);
+ abort(dequeue, true);
}
- void testCommitDequeue()
+ void testPrepareAndAbortSwap()
{
- Dequeue dequeue(this);
- commit(dequeue);
+ Swap swap(this, "SwapMessageId");
+ abort(swap, true);
}
- void testPrepareAndAbortDequeue()
+ void testAbortNoPrepareEnqueue()
{
- Dequeue dequeue(this);
- abort(dequeue, true);
+ Enqueue enqueue(this);
+ abort(enqueue, false);
}
void testAbortNoPrepareDequeue()
@@ -348,6 +345,12 @@
abort(dequeue, false);
}
+ void testAbortNoPrepareSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ abort(swap, false);
+ }
+
void testRecoverPreparedThenCommitted()
{
recoverPrepared(true);
@@ -363,73 +366,73 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
{
- cout << test_filename << ".PrepareAndAbortSwap: " << flush;
- tpct.testPrepareAndAbortSwap();
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitEnqueue)
+QPID_AUTO_TEST_CASE(CommitDequeue)
{
- cout << test_filename << ".CommitEnqueue: " << flush;
- tpct.testCommitEnqueue();
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+QPID_AUTO_TEST_CASE(CommitSwap)
{
- cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
- tpct.testAbortNoPrepareEnqueue();
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
{
- cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
- tpct.testPrepareAndAbortDequeue();
+ cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+ tpct.testPrepareAndAbortEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
{
- cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
- tpct.testRecoverPreparedThenCommitted();
+ cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+ tpct.testPrepareAndAbortDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitSwap)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
{
- cout << test_filename << ".CommitSwap: " << flush;
- tpct.testCommitSwap();
+ cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+ tpct.testPrepareAndAbortSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
{
- cout << test_filename << ".AbortNoPrepareSwap: " << flush;
- tpct.testAbortNoPrepareSwap();
+ cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+ tpct.testAbortNoPrepareEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
{
- cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
- tpct.testPrepareAndAbortEnqueue();
+ cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+ tpct.testAbortNoPrepareDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitDequeue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
{
- cout << test_filename << ".CommitDequeue: " << flush;
- tpct.testCommitDequeue();
+ cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+ tpct.testAbortNoPrepareSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
{
- cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
- tpct.testAbortNoPrepareDequeue();
+ cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+ tpct.testRecoverPreparedThenCommitted();
cout << "ok" << endl;
}
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -144,25 +144,23 @@
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
@@ -189,11 +187,10 @@
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -210,7 +207,6 @@
try
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
for (int m=0; m<2*NUM_MSGS; m+=2)
@@ -221,7 +217,7 @@
jc.initialize(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS); // First time only
else
{
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m - 1));
jc.recover_complete();
}
@@ -229,7 +225,7 @@
}
{
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m));
jc.recover_complete();
deq_msg(jc, m);
@@ -265,11 +261,10 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -70,7 +70,7 @@
void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks)
{ jcntl::initialize(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
0, &aio_wr_callback); }
- void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>& txn_list,
+ void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>* txn_list,
u_int64_t& highest_rid)
{ jcntl::recover(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, 0,
&aio_wr_callback, txn_list, highest_rid); }
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -144,7 +144,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -152,7 +151,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -187,7 +186,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -195,7 +193,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
@@ -209,7 +207,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -217,7 +214,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-07-14 13:50:09 UTC (rev 2193)
@@ -110,11 +110,10 @@
{
try
{
- std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
_jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback,
- prep_txn_list, highest_rid);
+ 0, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)
16 years, 5 months
rhmessaging commits: r2192 - in store/branches/mrg-1.0/cpp/lib: gen/qpid/management and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 09:05:07 -0400 (Mon, 14 Jul 2008)
New Revision: 2192
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp
store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h
Log:
Backport of trunk r.2171: Cleaned up BdbMessageStore, removed unneeded BDB sync storage.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 13:05:07 UTC (rev 2192)
@@ -51,7 +51,6 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
-bool BdbMessageStore::useAsync;
qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -88,7 +87,6 @@
mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (this, broker));
mgmtObject->set_location(storeDir);
- mgmtObject->set_async(useAsync);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
@@ -125,15 +123,12 @@
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
- // TODO: remove
- useAsync = true;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
journal::jdir::create_dir(bdbdir);
- bool ret = false;
try {
env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
@@ -167,11 +162,6 @@
throw;
}
- // TODO: remove
- bool force = false;
- ret = mode(useAsync, force);
- if (!ret) return false;
-
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -249,54 +239,6 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-// true is async
-bool BdbMessageStore::mode(const bool async, const bool force)
-{
-
- u_int32_t id (1); // key one in config is mode
- Dbt key(&id, sizeof(id));
- size_t preamble_length = sizeof(u_int32_t);
- BufferValue value(preamble_length, 0);
- u_int32_t avalue = async ? 1 : 2;
- value.buffer.putLong( avalue );
- bool same = false;
- bool hasMode = false;
-
- {
- Cursor config;
- config.open(configDb, 0);
- IdDbt rkey;
- BufferValue rvalue(preamble_length, 0);
- rvalue.buffer.record();
-
- while (config.next(rkey, rvalue)) {
- if (rkey.id == 1)
- {
- hasMode = true;
- u_int32_t valueL = rvalue.buffer.getLong();
- if (avalue == valueL){
- same = true;
- }else {
- break;
- }
- }
- }
- }
- if (same) return true;
- if (!same && !force && hasMode) return false;
- if (!same && force && hasMode) {
- truncate();
- }
-
- int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
- if (status == DB_KEYEXIST) {
- return false;
- } else {
- return true;
- }
- return false;
-}
-
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
@@ -346,36 +288,34 @@
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
- if (usingJrnl()) {
- JournalImpl* jQueue = 0;
- FieldTable::ValuePtr value;
+ JournalImpl* jQueue = 0;
+ FieldTable::ValuePtr value;
- uint16_t localFileCount = numJrnlFiles;
- uint32_t localFileSize = jrnlFsizePgs;
+ uint16_t localFileCount = numJrnlFiles;
+ uint32_t localFileSize = jrnlFsizePgs;
- value = args.get ("qpid.file_count");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileCount = (uint16_t) value->get<int>();
+ value = args.get ("qpid.file_count");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileCount = (uint16_t) value->get<int>();
- value = args.get ("qpid.file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileSize = (uint32_t) value->get<int>();
+ value = args.get ("qpid.file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileSize = (uint32_t) value->get<int>();
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"), defJournalGetEventsTimeout,
- defJournalFlushTimeout);
- }
- queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
- // init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": create() failed: " + e.what());
- }
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+ string("JournalData"), defJournalGetEventsTimeout,
+ defJournalFlushTimeout);
}
+ queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+ ": create() failed: " + e.what());
+ }
try {
if (!create(queueDb, queueIdSequence, queue)) {
@@ -557,38 +497,32 @@
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
- if (usingJrnl())
+ const char* queueName = queue->getName().c_str();
+ JournalImpl* jQueue = 0;
{
- const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = 0;
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
- }
- queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ }
+ queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recover_complete(); // start journal.
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
- }
- //read all messages: done on a per queue basis if using Journal
+ try
+ {
+ u_int64_t thisHighestRid = 0;
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recover_complete(); // start journal.
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
}
+ //read all messages: done on a per queue basis if using Journal
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
-
- if (!usingJrnl()) //read all messages:
- recoverMessages(txn, registry, queue_index, prepared, messages);
}
@@ -667,7 +601,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -784,69 +717,7 @@
}
-// bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared)
-{
- //have to create a new txn here, and commit in batches to avoid
- //problems with large message databases
- TxnCtxt txn;
- txn.begin(env);
- Cursor messages;
- messages.open(messageDb, txn.get());
-
- IdDbt key;
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
- u_int64_t maxMessageId(1);
-
- BufferValue value(preamble_length, 0);
- value.buffer.record();
- uint count(0);
- while (messages.next(key, value)) {
- if (++count % 1000 == 0) {
- QPID_LOG(debug, "Recovering " << count << "th message...");
- //reset cursor && txn:
- messages.close();
- txn.commit();
- txn.begin(env);
- messages.open(messageDb, txn.get());
- messages->get(&key, &value, DB_SET);
- }
- //read header only to begin with
- u_int32_t headerSize = value.buffer.getLong();
- value.buffer.restore();
-
- BufferValue header(headerSize, preamble_length);
- messages.current(key, header);
-
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
- msg->setPersistenceId(key.id);
-
- u_int32_t contentOffset = headerSize + preamble_length;
- u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) - contentOffset;
- if (msg->loadContent(contentSize)) {
- //now read the content
- BufferValue content(contentSize, contentOffset);
- messages.current(key, content);
- msg->decodeContent(content.buffer);
- }
-
- //find all the queues into which this message has been enqueued
- if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
- //message not referenced anywhere - can delete
- messages->del(0);
- } else {
- if (key.id > maxMessageId) {
- maxMessageId = key.id;
- }
- }
- }
- messages.close();
- txn.commit();
- messageIdSequence.reset(maxMessageId + 1);
-}
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked,
message_index& prepared)
@@ -881,34 +752,12 @@
collectPreparedXids(prepared);
//when using the async journal, it will abort unprepaired xids and populate the locked maps
- if (!usingJrnl()){
- txn_lock_map enqueues;
- txn_lock_map dequeues;
- std::set<string> known;
- readXids(enqueueXidDb, known);
- readXids(dequeueXidDb, known);
-
- //abort all known but unprepared xids:
- for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
- TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb, false);
- }
- }
- readLockedMappings(enqueueXidDb, enqueues);
- readLockedMappings(dequeueXidDb, dequeues);
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
- } else {
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- LockedMappings::shared_ptr enq_ptr;
- enq_ptr.reset(new LockedMappings);
- LockedMappings::shared_ptr deq_ptr;
- deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
- }
-
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ LockedMappings::shared_ptr enq_ptr;
+ enq_ptr.reset(new LockedMappings);
+ LockedMappings::shared_ptr deq_ptr;
+ deq_ptr.reset(new LockedMappings);
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
@@ -1081,7 +930,6 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
- if (!usingJrnl()) return;
if (queue.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue.getName();
@@ -1106,7 +954,6 @@
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
TxnCtxt implicit;
TxnCtxt* txn = 0;
@@ -1114,11 +961,10 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
-
bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
@@ -1127,20 +973,9 @@
}
store(&queue, txn, key, msg, newId);
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- }else{
- msg->enqueueComplete(); // set enqueued for ack
- put(mappingDb, txn->get(), key, value);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- // cct if using Journal do we need to wait for IO to complete before calling thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- }
- }
-
if (!ctxt) txn->commit();
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
@@ -1166,7 +1001,7 @@
try {
- if ( queue && usingJrnl()) {
+ if ( queue ) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
@@ -1197,8 +1032,6 @@
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
e.what());
- } catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -1221,37 +1054,21 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
+ msg->dequeueComplete();
+ // if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ // msg->setPersistenceId(0);
- } else if (txn->isTPC()) {
- //if this is part of a 2pc transaction, then only record the dequeue now,
- //it will be applied on commit
- record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- } else {
- Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
- if (dequeue(txn->get(), key, value)) {
- msg->setPersistenceId(0);//clear id as we have now removed the message from the store
- msg->dequeueComplete(); // set dequeued for ack
- }
- }
if (!ctxt) txn->commit();
- } catch (const DbException& e) {
- if (!ctxt) txn->abort();
- THROW_STORE_EXCEPTION_2("Error dequeing message", e);
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
throw;
@@ -1287,49 +1104,6 @@
}
}
-bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
-{
- //First look up the message, this gets a lock on that table in
- //case we need to delete it (avoiding deadlocks with enqueue where
- //the locking order is messageDb then mappingDb)
- Cursor msgCursor;
- msgCursor.open(messageDb, txn);
-
- try {
- Dbt peek;
- peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
- peek.set_ulen(0);
- int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
- if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record for message");
- } else if (status != 0 && status != DB_BUFFER_SMALL) {
- string e = "Dequeue failed (while seeking message) with unexpected status = ";
- e += DbEnv::strerror(status);
- THROW_STORE_EXCEPTION(e);
- }
- } catch (DbMemoryException& expected) {
- }
-
- Cursor cursor;
- cursor.open(mappingDb, txn);
-
- int status = cursor->get(&messageId, &queueId, DB_GET_BOTH | DB_RMW);
- if (status == 0) {
- cursor->del(0);
- } else if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record mapping message to queue");
- } else {
- THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
- }
-
- if (isUnused(cursor, messageId)) {
- msgCursor->del(0);
- return true;
- } else {
- return false;
- }
-}
-
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
checkInit();
@@ -1367,29 +1141,13 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env, !usingJrnl());
+ if (!txn.get()) txn.begin(env);
try {
StringDbt key(txn.getXid());
- if (!usingJrnl()){
- //scroll through all records matching xid in apply and dequeue
- //using the message and queue id encoded in each value
- Cursor c;
- c.open(apply, txn.get());
- IdPairDbt value;
-
- for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
- dequeue(txn.get(), value.message, value.queue);
- }
- c.close();
-
- //delete all records matching xid
- discard.del(txn.get(), &key, 0);
- apply.del(txn.get(), &key, 0);
- }
prepareXidDb.del(txn.get(), &key, 0);
txn.complete(commit);
@@ -1405,7 +1163,7 @@
checkInit();
// pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1413,11 +1171,11 @@
{
checkInit();
IdSequence* jtx = NULL;
- if (usingJrnl()) jtx = &messageIdSequence;
+ jtx = &messageIdSequence;
// pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1449,7 +1207,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->commit();
}
@@ -1460,7 +1218,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
txn->abort();
}
@@ -1487,14 +1245,6 @@
}
}
-void BdbMessageStore::record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId)
-{
- StringDbt key(txn.getXid());
- IdPairDbt value(queueId, messageId);
- put(db, txn.get(), key, value);
-}
-
-
bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
{
@@ -1607,7 +1357,7 @@
wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
- ("store-directory", qpid::optValue(storeDir, "DIR"),
+ ("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Must be supplied if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 13:05:07 UTC (rev 2192)
@@ -88,7 +88,6 @@
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
IdSequence messageIdSequence;
- static bool useAsync;
std::string storeDir;
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
@@ -124,8 +123,6 @@
Dbt& messageId,
boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
bool newId);
- void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
@@ -134,7 +131,7 @@
bool isUnused(Cursor& cursor, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+ void completed(TPCTxnCtxt& txn, bool commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
void deleteBinding(const qpid::broker::PersistableExchange& exchange,
@@ -152,7 +149,6 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
Modified: store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp 2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp 2008-07-14 13:05:07 UTC (rev 2192)
@@ -35,7 +35,7 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
+ {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
Store::Store (Manageable* _core, Manageable* _parent) :
ManagementObject(_core)
@@ -71,7 +71,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (5); // Config Element Count
+ buf.putShort (4); // Config Element Count
buf.putShort (0); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -93,14 +93,6 @@
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "async");
- ft.setInt (TYPE, TYPE_BOOL);
- ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
- ft.setString (DESC, "Asynchronous IO");
- buf.put (ft);
-
- ft = FieldTable ();
ft.setString (NAME, "defaultInitialFileCount");
ft.setInt (TYPE, TYPE_U16);
ft.setInt (ACCESS, ACCESS_RO);
@@ -135,7 +127,6 @@
writeTimestamps (buf);
buf.putLongLong (brokerRef);
buf.putShortString (location);
- buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
Modified: store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h 2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h 2008-07-14 13:05:07 UTC (rev 2192)
@@ -43,7 +43,6 @@
// Properties
uint64_t brokerRef;
std::string location;
- uint8_t async;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
@@ -89,11 +88,6 @@
location = val;
configChanged = true;
}
- inline void set_async (uint8_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- async = val;
- configChanged = true;
- }
inline void set_defaultInitialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
defaultInitialFileCount = val;
16 years, 5 months