Author: kpvdr
Date: 2008-06-30 11:27:47 -0400 (Mon, 30 Jun 2008)
New Revision: 2171
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
Log:
Cleaned up BdbMessageStore, removed unneeded BDB sync storage
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 15:27:47 UTC (rev 2171)
@@ -51,7 +51,6 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
-bool BdbMessageStore::useAsync;
qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 *
qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC);
// 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -88,7 +87,6 @@
mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store
(this, broker));
mgmtObject->set_location(storeDir);
- mgmtObject->set_async(useAsync);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
@@ -125,15 +123,12 @@
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
- // TODO: remove
- useAsync = true;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
journal::jdir::create_dir(bdbdir);
- bool ret = false;
try {
env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
@@ -167,11 +162,6 @@
throw;
}
- // TODO: remove
- bool force = false;
- ret = mode(useAsync, force);
- if (!ret) return false;
-
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir
<< "; jfiles=" << jfiles << "; jfileSizePgs="
<< jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -249,54 +239,6 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-// true is async
-bool BdbMessageStore::mode(const bool async, const bool force)
-{
-
- u_int32_t id (1); // key one in config is mode
- Dbt key(&id, sizeof(id));
- size_t preamble_length = sizeof(u_int32_t);
- BufferValue value(preamble_length, 0);
- u_int32_t avalue = async ? 1 : 2;
- value.buffer.putLong( avalue );
- bool same = false;
- bool hasMode = false;
-
- {
- Cursor config;
- config.open(configDb, 0);
- IdDbt rkey;
- BufferValue rvalue(preamble_length, 0);
- rvalue.buffer.record();
-
- while (config.next(rkey, rvalue)) {
- if (rkey.id == 1)
- {
- hasMode = true;
- u_int32_t valueL = rvalue.buffer.getLong();
- if (avalue == valueL){
- same = true;
- }else {
- break;
- }
- }
- }
- }
- if (same) return true;
- if (!same && !force && hasMode) return false;
- if (!same && force && hasMode) {
- truncate();
- }
-
- int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT
);
- if (status == DB_KEYEXIST) {
- return false;
- } else {
- return true;
- }
- return false;
-}
-
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
@@ -346,36 +288,34 @@
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
- if (usingJrnl()) {
- JournalImpl* jQueue = 0;
- FieldTable::ValuePtr value;
+ JournalImpl* jQueue = 0;
+ FieldTable::ValuePtr value;
- uint16_t localFileCount = numJrnlFiles;
- uint32_t localFileSize = jrnlFsizePgs;
+ uint16_t localFileCount = numJrnlFiles;
+ uint32_t localFileSize = jrnlFsizePgs;
- value = args.get ("qpid.file_count");
- if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>())
- localFileCount = (uint16_t) value->get<int>();
+ value = args.get ("qpid.file_count");
+ if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>())
+ localFileCount = (uint16_t) value->get<int>();
- value = args.get ("qpid.file_size");
- if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>())
- localFileSize = (uint32_t) value->get<int>();
+ value = args.get ("qpid.file_size");
+ if (value.get() != 0 && !value->empty() &&
value->convertsTo<int>())
+ localFileSize = (uint32_t) value->get<int>();
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"),
defJournalGetEventsTimeout,
- defJournalFlushTimeout);
- }
- queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
- // init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks);
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": create() failed: " + e.what());
- }
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+ string("JournalData"),
defJournalGetEventsTimeout,
+ defJournalFlushTimeout);
}
+ queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks);
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+ ": create() failed: " + e.what());
+ }
try {
if (!create(queueDb, queueIdSequence, queue)) {
@@ -557,38 +497,32 @@
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
- if (usingJrnl())
+ const char* queueName = queue->getName().c_str();
+ JournalImpl* jQueue = 0;
{
- const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = 0;
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
- }
-
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ }
+
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start
recovery
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recover_complete(); // start journal.
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queueName +
": recoverQueues() failed: " + e.what());
- }
- //read all messages: done on a per queue basis if using Journal
+ try
+ {
+ u_int64_t thisHighestRid = 0;
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start
recovery
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recover_complete(); // start journal.
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ":
recoverQueues() failed: " + e.what());
}
+ //read all messages: done on a per queue basis if using Journal
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
-
- if (!usingJrnl()) //read all messages:
- recoverMessages(txn, registry, queue_index, prepared, messages);
}
@@ -667,7 +601,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr&
queue, txn_list& locked, message_index& prepared)
{
@@ -784,69 +717,7 @@
}
-// bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery,
queue_index& index,
- txn_list& locked, message_index& prepared)
-{
- //have to create a new txn here, and commit in batches to avoid
- //problems with large message databases
- TxnCtxt txn;
- txn.begin(env);
- Cursor messages;
- messages.open(messageDb, txn.get());
-
- IdDbt key;
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
- u_int64_t maxMessageId(1);
-
- BufferValue value(preamble_length, 0);
- value.buffer.record();
- uint count(0);
- while (messages.next(key, value)) {
- if (++count % 1000 == 0) {
- QPID_LOG(debug, "Recovering " << count << "th
message...");
- //reset cursor && txn:
- messages.close();
- txn.commit();
- txn.begin(env);
- messages.open(messageDb, txn.get());
- messages->get(&key, &value, DB_SET);
- }
- //read header only to begin with
- u_int32_t headerSize = value.buffer.getLong();
- value.buffer.restore();
-
- BufferValue header(headerSize, preamble_length);
- messages.current(key, header);
-
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
- msg->setPersistenceId(key.id);
-
- u_int32_t contentOffset = headerSize + preamble_length;
- u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) -
contentOffset;
- if (msg->loadContent(contentSize)) {
- //now read the content
- BufferValue content(contentSize, contentOffset);
- messages.current(key, content);
- msg->decodeContent(content.buffer);
- }
-
- //find all the queues into which this message has been enqueued
- if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
- //message not referenced anywhere - can delete
- messages->del(0);
- } else {
- if (key.id > maxMessageId) {
- maxMessageId = key.id;
- }
- }
- }
- messages.close();
- txn.commit();
- messageIdSequence.reset(maxMessageId + 1);
-}
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId,
RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked,
message_index& prepared)
@@ -881,34 +752,12 @@
collectPreparedXids(prepared);
//when using the async journal, it will abort unprepaired xids and populate the
locked maps
- if (!usingJrnl()){
- txn_lock_map enqueues;
- txn_lock_map dequeues;
- std::set<string> known;
- readXids(enqueueXidDb, known);
- readXids(dequeueXidDb, known);
-
- //abort all known but unprepared xids:
- for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
- TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb, false);
- }
- }
- readLockedMappings(enqueueXidDb, enqueues);
- readLockedMappings(dequeueXidDb, dequeues);
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end();
i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
- } else {
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end();
i++) {
- LockedMappings::shared_ptr enq_ptr;
- enq_ptr.reset(new LockedMappings);
- LockedMappings::shared_ptr deq_ptr;
- deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
- }
-
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++)
{
+ LockedMappings::shared_ptr enq_ptr;
+ enq_ptr.reset(new LockedMappings);
+ LockedMappings::shared_ptr deq_ptr;
+ deq_ptr.reset(new LockedMappings);
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
@@ -1081,7 +930,6 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
- if (!usingJrnl()) return;
if (queue.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue.getName();
@@ -1107,7 +955,6 @@
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
TxnCtxt implicit;
TxnCtxt* txn = 0;
@@ -1115,11 +962,10 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
-
bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
@@ -1128,20 +974,9 @@
}
store(&queue, txn, key, msg, newId);
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- }else{
- msg->enqueueComplete(); // set enqueued for ack
- put(mappingDb, txn->get(), key, value);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- // cct if using Journal do we need to wait for IO to complete before calling
thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn),
messageId, queueId);
- }
- }
-
if (!ctxt) txn->commit();
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
@@ -1167,7 +1002,7 @@
try {
- if ( queue && usingJrnl()) {
+ if ( queue ) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
@@ -1198,8 +1033,6 @@
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
": store() failed: " +
e.what());
- } catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -1223,37 +1056,21 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
+ msg->dequeueComplete();
+ // if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ // msg->setPersistenceId(0);
- } else if (txn->isTPC()) {
- //if this is part of a 2pc transaction, then only record the dequeue now,
- //it will be applied on commit
- record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn),
messageId, queueId);
- } else {
- Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
- if (dequeue(txn->get(), key, value)) {
- msg->setPersistenceId(0);//clear id as we have now removed the message
from the store
- msg->dequeueComplete(); // set dequeued for ack
- }
- }
if (!ctxt) txn->commit();
- } catch (const DbException& e) {
- if (!ctxt) txn->abort();
- THROW_STORE_EXCEPTION_2("Error dequeing message", e);
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
throw;
@@ -1289,49 +1106,6 @@
}
}
-bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
-{
- //First look up the message, this gets a lock on that table in
- //case we need to delete it (avoiding deadlocks with enqueue where
- //the locking order is messageDb then mappingDb)
- Cursor msgCursor;
- msgCursor.open(messageDb, txn);
-
- try {
- Dbt peek;
- peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
- peek.set_ulen(0);
- int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
- if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record for message");
- } else if (status != 0 && status != DB_BUFFER_SMALL) {
- string e = "Dequeue failed (while seeking message) with unexpected
status = ";
- e += DbEnv::strerror(status);
- THROW_STORE_EXCEPTION(e);
- }
- } catch (DbMemoryException& expected) {
- }
-
- Cursor cursor;
- cursor.open(mappingDb, txn);
-
- int status = cursor->get(&messageId, &queueId, DB_GET_BOTH | DB_RMW);
- if (status == 0) {
- cursor->del(0);
- } else if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record mapping message to
queue");
- } else {
- THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
- }
-
- if (isUnused(cursor, messageId)) {
- msgCursor->del(0);
- return true;
- } else {
- return false;
- }
-}
-
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue&
/*queue*/)
{
checkInit();
@@ -1369,29 +1143,13 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env, !usingJrnl());
+ if (!txn.get()) txn.begin(env);
try {
StringDbt key(txn.getXid());
- if (!usingJrnl()){
- //scroll through all records matching xid in apply and dequeue
- //using the message and queue id encoded in each value
- Cursor c;
- c.open(apply, txn.get());
- IdPairDbt value;
-
- for (int status = c->get(&key, &value, DB_SET); status == 0;
status = c->get(&key, &value, DB_NEXT_DUP)) {
- dequeue(txn.get(), value.message, value.queue);
- }
- c.close();
-
- //delete all records matching xid
- discard.del(txn.get(), &key, 0);
- apply.del(txn.get(), &key, 0);
- }
prepareXidDb.del(txn.get(), &key, 0);
txn.complete(commit);
@@ -1407,7 +1165,7 @@
checkInit();
// pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1415,11 +1173,11 @@
{
checkInit();
IdSequence* jtx = NULL;
- if (usingJrnl()) jtx = &messageIdSequence;
+ jtx = &messageIdSequence;
// pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1451,7 +1209,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb,
true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->commit();
}
@@ -1462,7 +1220,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb,
false);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
txn->abort();
}
@@ -1489,14 +1247,6 @@
}
}
-void BdbMessageStore::record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId,
u_int64_t queueId)
-{
- StringDbt key(txn.getXid());
- IdPairDbt value(queueId, messageId);
- put(db, txn.get(), key, value);
-}
-
-
bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt&
value)
{
@@ -1609,7 +1359,7 @@
wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
- ("store-directory", qpid::optValue(storeDir, "DIR"),
+ ("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir
value). "
"Must be supplied if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 15:27:47 UTC (rev 2171)
@@ -88,7 +88,6 @@
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
IdSequence messageIdSequence;
- static bool useAsync;
std::string storeDir;
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
@@ -124,8 +123,6 @@
Dbt& messageId,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>&
message,
bool newId);
- void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
const
boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
@@ -134,7 +131,7 @@
bool isUnused(Cursor& cursor, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable&
p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+ void completed(TPCTxnCtxt& txn, bool commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t
queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
void deleteBinding(const qpid::broker::PersistableExchange& exchange,
@@ -152,7 +149,6 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs,
defWCachePageSize); isInit = true;
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 15:27:47 UTC (rev 2171)
@@ -35,7 +35,7 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
+ {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
Store::Store (Manageable* _core, Manageable* _parent) :
ManagementObject(_core)
@@ -71,7 +71,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (5); // Config Element Count
+ buf.putShort (4); // Config Element Count
buf.putShort (0); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -93,14 +93,6 @@
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "async");
- ft.setInt (TYPE, TYPE_BOOL);
- ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
- ft.setString (DESC, "Asynchronous IO");
- buf.put (ft);
-
- ft = FieldTable ();
ft.setString (NAME, "defaultInitialFileCount");
ft.setInt (TYPE, TYPE_U16);
ft.setInt (ACCESS, ACCESS_RO);
@@ -135,7 +127,6 @@
writeTimestamps (buf);
buf.putLongLong (brokerRef);
buf.putShortString (location);
- buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 15:27:47 UTC (rev 2171)
@@ -43,7 +43,6 @@
// Properties
uint64_t brokerRef;
std::string location;
- uint8_t async;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
@@ -89,11 +88,6 @@
location = val;
configChanged = true;
}
- inline void set_async (uint8_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- async = val;
- configChanged = true;
- }
inline void set_defaultInitialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
defaultInitialFileCount = val;