[rhmessaging-commits] rhmessaging commits: r2200 - store/branches/mrg-1.0/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jul 14 14:05:24 EDT 2008
Author: kpvdr
Date: 2008-07-14 14:05:24 -0400 (Mon, 14 Jul 2008)
New Revision: 2200
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
Log:
Whitespace changes only - misc tidyups and respacing which makes BdbMessageStore.cpp/.h and TxnCtxt.h similar to trunk.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 18:05:24 UTC (rev 2200)
@@ -110,20 +110,20 @@
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wCachePageSize)
{
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
+ break;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
+ break;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
if (dir.size()>0) storeDir = dir;
@@ -136,8 +136,8 @@
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
- "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
- "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
+ "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
+ "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
THROW_STORE_EXCEPTION_2("Error opening environment", e);
}
@@ -204,37 +204,37 @@
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0) {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- } else {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
+ if (oldJrnlWrCachePageSize == 0) {
+ // For zero value, use default
+ jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
+ } else {
+ // For any positive value, use closest value
+ if (oldJrnlWrCachePageSize < 6)
+ jrnlWrCachePageSize = 4;
+ else if (oldJrnlWrCachePageSize < 12)
+ jrnlWrCachePageSize = 8;
+ else if (oldJrnlWrCachePageSize < 24)
+ jrnlWrCachePageSize = 16;
+ else if (oldJrnlWrCachePageSize < 48)
+ jrnlWrCachePageSize = 32;
+ else if (oldJrnlWrCachePageSize < 96)
+ jrnlWrCachePageSize = 64;
+ else if (oldJrnlWrCachePageSize > 128)
+ jrnlWrCachePageSize = 128;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
+ }
}
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
@@ -725,15 +725,15 @@
break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
- ::usleep(AIO_SLEEP_TIME);
- break;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+ read = false;
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
- assert( "Store Error: Unexpected msg state");
+ assert("Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
@@ -1041,7 +1041,8 @@
}
}
-void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::enqueue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1092,9 +1093,9 @@
if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
+ dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
@@ -1123,7 +1124,8 @@
}
}
-void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::dequeue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 18:05:24 UTC (rev 2200)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _BdbMessageStore_
@@ -46,286 +46,286 @@
#endif
namespace rhm {
- namespace bdbstore {
+namespace bdbstore {
- /**
- * An implementation of the MessageStore interface based on Berkeley DB
- */
- class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
- {
- protected:
- typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+/**
+ * An implementation of the MessageStore interface based on Berkeley DB
+ */
+class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
+{
+ protected:
+ typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
- typedef LockedMappings::map txn_lock_map;
- typedef boost::ptr_list<PreparedTransaction> txn_list;
+ typedef LockedMappings::map txn_lock_map;
+ typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
- u_int64_t rid;
- bool deq_flag;
- bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
- };
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ // Structs for preparedXidStore recover state
+ struct PreparedRecoverStruct {
+ u_int64_t rid;
+ bool deq_flag;
+ bool commit_flag;
+ PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ };
+ typedef PreparedRecoverStruct PreparedRecover;
+ typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+ typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+ typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
- // Default store settings
- static const u_int16_t defNumJrnlFiles = 8;
- static const u_int32_t defJrnlFileSizePgs = 24;
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- static const u_int16_t defXidStoreNumJrnlFiles = 8;
- static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
- static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ // Default store settings
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defXidStoreNumJrnlFiles = 8;
+ static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+ static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- std::list<Db*> dbs;
- DbEnv env;
- Db queueDb;
- Db configDb;
- Db exchangeDb;
- Db messageDb;
- Db mappingDb;
- Db bindingDb;
- Db generalDb;
+ std::list<Db*> dbs;
+ DbEnv env;
+ Db queueDb;
+ Db configDb;
+ Db exchangeDb;
+ Db messageDb;
+ Db mappingDb;
+ Db bindingDb;
+ Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to prepared XID journal instance
+ boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+ PreparedRecoverMap preparedXidStoreRecoverMap;
- IdSequence queueIdSequence;
- IdSequence exchangeIdSequence;
- IdSequence generalIdSequence;
- IdSequence messageIdSequence;
- std::string storeDir;
- u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
- u_int64_t highestRid;
- bool isInit;
- const char* envPath;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
- qpid::management::Store::shared_ptr mgmtObject;
- qpid::sys::Mutex jrnlCreateLock;
+ IdSequence queueIdSequence;
+ IdSequence exchangeIdSequence;
+ IdSequence generalIdSequence;
+ IdSequence messageIdSequence;
+ std::string storeDir;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wcache_pgsize_sblks;
+ u_int16_t wcache_num_pages;
+ u_int64_t highestRid;
+ bool isInit;
+ const char* envPath;
+ static qpid::sys::Duration defJournalGetEventsTimeout;
+ static qpid::sys::Duration defJournalFlushTimeout;
+ qpid::management::Store::shared_ptr mgmtObject;
+ qpid::sys::Mutex jrnlCreateLock;
- void recoverQueues(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- queue_index& index,
- txn_list& locked,
- message_index& messages);
- void recoverMessages(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- queue_index& index,
- txn_list& locked,
- message_index& prepared);
- void recoverMessages(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked,
- message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery,
- exchange_index& index);
- void recoverBindings(TxnCtxt& txn,
- exchange_index& exchanges,
- queue_index& queues);
- void recoverGeneral(TxnCtxt& txn,
- qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn,
- IdDbt& msgId,
- qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index,
- txn_list& locked,
- message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
- TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue,
- TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor,
- DbTxn* txn,
- Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn,
- Dbt& messageId);
- bool isUnused(Cursor& cursor,
- Dbt& messageId);
- void destroy(Db& db,
- const qpid::broker::Persistable& p);
- bool create(Db& db,
- IdSequence& seq,
- const qpid::broker::Persistable& p);
- void completed(TxnCtxt& txn,
- bool commit);
- void record2pcOp(Db& db,
- TPCTxnCtxt& txn,
- u_int64_t messageId,
- u_int64_t queueId);
- void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key);
+ void recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& messages);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked,
+ message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId,
+ unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ exchange_index& index);
+ void recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverPreparedXidJournal();
+ void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
+ void recoverXids(txn_list& txns);
+ void readLockedMappings(Db& db,
+ txn_lock_map& mappings);
+ TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+ void store(const qpid::broker::PersistableQueue* queue,
+ TxnCtxt* txn,
+ Dbt& messageId,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn,
+ Dbt& messageId);
+ bool isUnused(Cursor& cursor,
+ Dbt& messageId);
+ void destroy(Db& db,
+ const qpid::broker::Persistable& p);
+ bool create(Db& db,
+ IdSequence& seq,
+ const qpid::broker::Persistable& p);
+ void completed(TxnCtxt& txn,
+ bool commit);
+ void record2pcOp(Db& db,
+ TPCTxnCtxt& txn,
+ u_int64_t messageId,
+ u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key);
- u_int64_t getRecordSize(Db& db,
- Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn,
- Db& db,
- Dbt& key);
- void put(Db& db,
- DbTxn* txn,
- Dbt& key,
- Dbt& value);
- bool deleteKeyValuePair(Db& db,
- DbTxn* txn,
- Dbt& key,
- Dbt& value);
- void open(Db& db,
- DbTxn* txn,
- const char* file,
- bool dupKey);
+ u_int64_t getRecordSize(Db& db,
+ Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key);
+ void put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ bool deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ void open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey);
- // journal functions
- void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- std::string getJrnlDir(const char* queueName);
- std::string getJrnlBaseDir();
- std::string getBdbBaseDir();
- std::string getPxidBaseDir();
- inline void checkInit() {
- if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
- }
- void chkInitPreparedXidStore();
-
- // debug aid for printing XIDs that may contain non-printable chars
- static std::string xid2str(const std::string xid) {
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- for (unsigned i=0; i<xid.size(); i++) {
- if (isprint(xid[i]))
- oss << xid[i];
- else
- oss << "/" << std::setw(2) << (int)((char)xid[i]);
- }
- return oss.str();
- }
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ std::string getJrnlDir(const char* queueName);
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
+ inline void checkInit() {
+ if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ }
+ void chkInitPreparedXidStore();
- public:
- struct Options : public qpid::Options {
- Options(const std::string& name="Store Options");
- std::string clusterName;
- std::string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
- };
+ // debug aid for printing XIDs that may contain non-printable chars
+ static std::string xid2str(const std::string xid) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (unsigned i=0; i<xid.size(); i++) {
+ if (isprint(xid[i]))
+ oss << xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)xid[i]);
+ }
+ return oss.str();
+ }
- typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
+ public:
+ struct Options : public qpid::Options {
+ Options(const std::string& name="Store Options");
+ std::string clusterName;
+ std::string storeDir;
+ bool storeAsync;
+ bool storeForce;
+ uint16_t numJrnlFiles;
+ uint32_t jrnlFsizePgs;
+ uint32_t wCachePageSize;
+ };
- BdbMessageStore(const char* envpath = 0);
+ typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
- virtual ~BdbMessageStore();
+ BdbMessageStore(const char* envpath = 0);
- bool init(const qpid::Options* options);
+ virtual ~BdbMessageStore();
- bool init(const std::string& dir,
- u_int16_t jfiles,
- u_int32_t jfileSizePgs,
- uint32_t wCachePageSize);
+ bool init(const qpid::Options* options);
- void initManagement (qpid::broker::Broker* broker);
+ bool init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize);
- void truncate();
+ void initManagement (qpid::broker::Broker* broker);
- void create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& args);
+ void truncate();
- void destroy(qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableExchange& queue,
- const qpid::framing::FieldTable& args);
+ void destroy(qpid::broker::PersistableQueue& queue);
- void destroy(const qpid::broker::PersistableExchange& queue);
+ void create(const qpid::broker::PersistableExchange& queue,
+ const qpid::framing::FieldTable& args);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key,
- const qpid::framing::FieldTable& args);
+ void destroy(const qpid::broker::PersistableExchange& queue);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key,
- const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableConfig& config);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
- void destroy(const qpid::broker::PersistableConfig& config);
+ void create(const qpid::broker::PersistableConfig& config);
- void recover(qpid::broker::RecoveryManager& queues);
+ void destroy(const qpid::broker::PersistableConfig& config);
- void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void recover(qpid::broker::RecoveryManager& queues);
- void destroy(qpid::broker::PersistableMessage& msg);
+ void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- const std::string& data);
+ void destroy(qpid::broker::PersistableMessage& msg);
- void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data,
- u_int64_t offset,
- u_int32_t length);
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
- void flush(const qpid::broker::PersistableQueue& queue);
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
- u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ void flush(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<std::string>& xids);
+ u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- std::auto_ptr<qpid::broker::TransactionContext> begin();
+ void collectPreparedXids(std::set<std::string>& xids);
- std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ std::auto_ptr<qpid::broker::TransactionContext> begin();
- void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
- void localPrepare(TxnCtxt* ctxt);
+ void prepare(qpid::broker::TPCTransactionContext& ctxt);
- void commit(qpid::broker::TransactionContext& ctxt);
+ void localPrepare(TxnCtxt* ctxt);
- void abort(qpid::broker::TransactionContext& ctxt);
+ void commit(qpid::broker::TransactionContext& ctxt);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
- { return mgmtObject; }
+ void abort(qpid::broker::TransactionContext& ctxt);
- inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
- { return qpid::management::Manageable::STATUS_OK; }
- }; // class BdbMessageStore
-
- } // namespace bdbstore
+ qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
+
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ { return qpid::management::Manageable::STATUS_OK; }
+}; // class BdbMessageStore
+
+} // namespace bdbstore
} // namespace rhm
-
#endif
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:32:13 UTC (rev 2199)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 18:05:24 UTC (rev 2200)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _TxnCtxt_
@@ -170,7 +170,7 @@
DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
-
+
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
More information about the rhmessaging-commits
mailing list