[rhmessaging-commits] rhmessaging commits: r2156 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jun 23 15:44:16 EDT 2008
Author: aconway
Date: 2008-06-23 15:44:16 -0400 (Mon, 23 Jun 2008)
New Revision: 2156
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/DataTokenImpl.h
Log:
Const-correctness fixes in MessageStore.h
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-23 19:44:16 UTC (rev 2156)
@@ -109,20 +109,20 @@
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wCachePageSize)
{
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
+ break;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
+ break;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
useAsync = async;
@@ -138,8 +138,8 @@
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
- "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
- "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
+ "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
+ "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
THROW_STORE_EXCEPTION_2("Error opening environment", e);
}
@@ -206,40 +206,40 @@
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0)
- {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- }
- else
- {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
+ if (oldJrnlWrCachePageSize == 0)
+ {
+ // For zero value, use default
+ jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
+ }
+ else
+ {
+ // For any positive value, use closest value
+ if (oldJrnlWrCachePageSize < 6)
+ jrnlWrCachePageSize = 4;
+ else if (oldJrnlWrCachePageSize < 12)
+ jrnlWrCachePageSize = 8;
+ else if (oldJrnlWrCachePageSize < 24)
+ jrnlWrCachePageSize = 16;
+ else if (oldJrnlWrCachePageSize < 48)
+ jrnlWrCachePageSize = 32;
+ else if (oldJrnlWrCachePageSize < 96)
+ jrnlWrCachePageSize = 64;
+ else if (oldJrnlWrCachePageSize > 128)
+ jrnlWrCachePageSize = 128;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
+ }
}
return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
@@ -740,15 +740,15 @@
break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
- ::usleep(AIO_SLEEP_TIME);
- break;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ read = false;
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
default:
- assert( "Store Error: Unexpected msg state");
+ assert( "Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
@@ -939,7 +939,7 @@
readXids(prepareXidDb, xids);
}
-void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
+void BdbMessageStore::stage(const intrusive_ptr<PersistableMessage>& msg)
{
checkInit();
TxnCtxt txn;
@@ -1092,7 +1092,8 @@
}
}
-void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::enqueue(TransactionContext* ctxt,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1146,7 +1147,7 @@
void BdbMessageStore::store(const PersistableQueue* queue,
TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
+ const intrusive_ptr<PersistableMessage>& message,
bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
@@ -1165,9 +1166,9 @@
if ( queue && usingJrnl()) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
+ dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()){
@@ -1198,7 +1199,8 @@
}
}
-void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::dequeue(TransactionContext* ctxt,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1256,7 +1258,7 @@
void BdbMessageStore::async_dequeue(
TransactionContext* ctxt,
- intrusive_ptr<PersistableMessage>& msg,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-23 19:44:16 UTC (rev 2156)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _BdbMessageStore_
@@ -52,189 +52,189 @@
#endif
namespace rhm {
- namespace bdbstore {
- using std::string;
+namespace bdbstore {
+using std::string;
- /**
- * An implementation of the MessageStore interface based on Berkeley DB
- */
- class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
- {
- typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+/**
+ * An implementation of the MessageStore interface based on Berkeley DB
+ */
+class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
+{
+ typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
- typedef LockedMappings::map txn_lock_map;
- typedef boost::ptr_list<PreparedTransaction> txn_list;
+ typedef LockedMappings::map txn_lock_map;
+ typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Default store settings
- static const bool defUseAsync = false;
- static const bool defForceStoreConversion = false;
- static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
- static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+ // Default store settings
+ static const bool defUseAsync = false;
+ static const bool defForceStoreConversion = false;
+ static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
+ static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
- std::list<Db*> dbs;
- DbEnv env;
- Db queueDb;
- Db configDb;
- Db exchangeDb;
- Db messageDb;
- Db mappingDb;
- Db bindingDb;
- Db generalDb;
- Db enqueueXidDb;
- Db dequeueXidDb;
- Db prepareXidDb;
- IdSequence queueIdSequence;
- IdSequence exchangeIdSequence;
- IdSequence generalIdSequence;
- IdSequence messageIdSequence;
- static bool useAsync;
- std::string storeDir;
- u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
- bool isInit;
- const char* envPath;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
- qpid::management::Store::shared_ptr mgmtObject;
- qpid::sys::Mutex jrnlCreateLock;
+ std::list<Db*> dbs;
+ DbEnv env;
+ Db queueDb;
+ Db configDb;
+ Db exchangeDb;
+ Db messageDb;
+ Db mappingDb;
+ Db bindingDb;
+ Db generalDb;
+ Db enqueueXidDb;
+ Db dequeueXidDb;
+ Db prepareXidDb;
+ IdSequence queueIdSequence;
+ IdSequence exchangeIdSequence;
+ IdSequence generalIdSequence;
+ IdSequence messageIdSequence;
+ static bool useAsync;
+ std::string storeDir;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wcache_pgsize_sblks;
+ u_int16_t wcache_num_pages;
+ bool isInit;
+ const char* envPath;
+ static qpid::sys::Duration defJournalGetEventsTimeout;
+ static qpid::sys::Duration defJournalFlushTimeout;
+ qpid::management::Store::shared_ptr mgmtObject;
+ qpid::sys::Mutex jrnlCreateLock;
- bool mode(const bool mode, const bool force);
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& messages);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
- void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
- void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked, message_index& prepared);
- void recoverXids(txn_list& txns);
- void readXids(Db& db, std::set<string>& xids);
- void readLockedMappings(Db& db, txn_lock_map& mappings);
- TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
- bool isUnused(Cursor& cursor, Dbt& messageId);
- void destroy(Db& db, const qpid::broker::Persistable& p);
- bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
- void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
- void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key);
+ bool mode(const bool mode, const bool force);
+ void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
+ txn_list& locked, message_index& messages);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
+ txn_list& locked, message_index& prepared);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked, message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
+ void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index, txn_list& locked, message_index& prepared);
+ void recoverXids(txn_list& txns);
+ void readXids(Db& db, std::set<string>& xids);
+ void readLockedMappings(Db& db, txn_lock_map& mappings);
+ TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+ void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
+ Dbt& messageId,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
+ bool isUnused(Cursor& cursor, Dbt& messageId);
+ void destroy(Db& db, const qpid::broker::Persistable& p);
+ bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
+ void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+ void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key);
- u_int64_t getRecordSize(Db& db, Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
- void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
+ u_int64_t getRecordSize(Db& db, Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
+ void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
+ bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
+ void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
- // journal functions
- void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return useAsync;}
- string getJrnlBaseDir();
- inline void checkInit() {
- if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
- }
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ string getJrnlDir(const char* queueName);
+ static inline bool usingJrnl() {return useAsync;}
+ string getJrnlBaseDir();
+ inline void checkInit() {
+ if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ }
- public:
- struct Options : public qpid::Options {
- Options(const std::string& name="Store Options");
- string clusterName;
- string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
- };
+ public:
+ struct Options : public qpid::Options {
+ Options(const std::string& name="Store Options");
+ string clusterName;
+ string storeDir;
+ bool storeAsync;
+ bool storeForce;
+ uint16_t numJrnlFiles;
+ uint32_t jrnlFsizePgs;
+ uint32_t wCachePageSize;
+ };
- typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
+ typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
- BdbMessageStore(const char* envpath = 0);
- virtual ~BdbMessageStore();
- bool init(const qpid::Options* options);
- bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
- void initManagement (qpid::broker::Broker* broker);
+ BdbMessageStore(const char* envpath = 0);
+ virtual ~BdbMessageStore();
+ bool init(const qpid::Options* options);
+ bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+ void initManagement (qpid::broker::Broker* broker);
- void truncate();
+ void truncate();
- void create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& args);
- void destroy(qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ void destroy(qpid::broker::PersistableQueue& queue);
- void create(const qpid::broker::PersistableExchange& queue,
- const qpid::framing::FieldTable& args);
- void destroy(const qpid::broker::PersistableExchange& queue);
+ void create(const qpid::broker::PersistableExchange& queue,
+ const qpid::framing::FieldTable& args);
+ void destroy(const qpid::broker::PersistableExchange& queue);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable& args);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableConfig& config);
- void destroy(const qpid::broker::PersistableConfig& config);
+ void create(const qpid::broker::PersistableConfig& config);
+ void destroy(const qpid::broker::PersistableConfig& config);
- void recover(qpid::broker::RecoveryManager& queues);
+ void recover(qpid::broker::RecoveryManager& queues);
- void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
- void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
- void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void destroy(qpid::broker::PersistableMessage& msg);
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data, u_int64_t offset, u_int32_t length);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void flush(const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ void flush(const qpid::broker::PersistableQueue& queue);
- u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<string>& xids);
+ void collectPreparedXids(std::set<string>& xids);
- std::auto_ptr<qpid::broker::TransactionContext> begin();
- std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
- void prepare(qpid::broker::TPCTransactionContext& ctxt);
- void commit(qpid::broker::TransactionContext& ctxt);
- void abort(qpid::broker::TransactionContext& ctxt);
+ std::auto_ptr<qpid::broker::TransactionContext> begin();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ void commit(qpid::broker::TransactionContext& ctxt);
+ void abort(qpid::broker::TransactionContext& ctxt);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
- { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
- { return qpid::management::Manageable::STATUS_OK; }
- };
- }
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ { return qpid::management::Manageable::STATUS_OK; }
+};
}
+}
#endif
Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/DataTokenImpl.h 2008-06-23 19:44:16 UTC (rev 2156)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _DataTokenImpl_
@@ -29,23 +29,23 @@
#include <qpid/broker/PersistableMessage.h>
namespace rhm {
- namespace bdbstore {
+namespace bdbstore {
- class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
- {
- private:
- boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
- public:
- DataTokenImpl();
- virtual ~DataTokenImpl();
+class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
+{
+ private:
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
+ public:
+ DataTokenImpl();
+ virtual ~DataTokenImpl();
- inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
- { return sourceMsg; }
- inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
- { sourceMsg = msg; }
- };
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+ { return sourceMsg; }
+ inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+ { sourceMsg = msg; }
+};
- } // namespace bdbstore
- } // namespace rhm
+} // namespace bdbstore
+} // namespace rhm
#endif
More information about the rhmessaging-commits
mailing list