[rhmessaging-commits] rhmessaging commits: r1437 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Dec 6 13:24:12 EST 2007
Author: aconway
Date: 2007-12-06 13:24:12 -0500 (Thu, 06 Dec 2007)
New Revision: 1437
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Replaced TimerA with Timer, use intrusive_ptr for Timer.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#include "BdbMessageStore.h"
@@ -62,8 +62,8 @@
prepareXidDb(&env, 0),
numJrnlFiles(8),
jrnlFsizePgs(24),
- isInit(false),
- envPath(envpath)
+ isInit(false),
+ envPath(envpath)
{
@@ -72,15 +72,15 @@
bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs)
{
- if (isInit) return true;
+ if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
- useAsync = async;
- if (dir.size()>0) storeDir = dir;
+ useAsync = async;
+ if (dir.size()>0) storeDir = dir;
- string bdbdir = storeDir + "/rhm/dat/";
- journal::jdir::create_dir(bdbdir);
+ string bdbdir = storeDir + "/rhm/dat/";
+ journal::jdir::create_dir(bdbdir);
bool ret = false;
@@ -111,11 +111,11 @@
txn.abort();
throw;
}
- ret = mode(useAsync, force);
- if (!ret) return false;
+ ret = mode(useAsync, force);
+ if (!ret) return false;
- isInit = true;
- return true;
+ isInit = true;
+ return true;
}
bool BdbMessageStore::init(const qpid::Options* options)
@@ -154,17 +154,17 @@
bool BdbMessageStore::mode(const bool async, const bool force)
{
- u_int32_t id (1); // key one in config is mode
+ u_int32_t id (1); // key one in config is mode
Dbt key(&id, sizeof(id));
size_t preamble_length = sizeof(u_int32_t);
BufferValue value(preamble_length, 0);
- u_int32_t avalue = async ? 1 : 2;
- value.buffer.putLong( avalue );
- bool same = false;
- bool hasMode = false;
+ u_int32_t avalue = async ? 1 : 2;
+ value.buffer.putLong( avalue );
+ bool same = false;
+ bool hasMode = false;
{
- Cursor config;
+ Cursor config;
config.open(configDb, 0);
IdDbt rkey;
BufferValue rvalue(preamble_length, 0);
@@ -172,21 +172,21 @@
while (config.next(rkey, rvalue)) {
if (rkey.id == 1)
- {
- hasMode = true;
- u_int32_t valueL = rvalue.buffer.getLong();
- if (avalue == valueL){
- same = true;
- }else {
- break;
- }
+ {
+ hasMode = true;
+ u_int32_t valueL = rvalue.buffer.getLong();
+ if (avalue == valueL){
+ same = true;
+ }else {
+ break;
+ }
}
- }
+ }
}
if (same) return true;
- if (!same && !force && hasMode) return false;
- if (!same && force && hasMode) {
- truncate();
+ if (!same && !force && hasMode) return false;
+ if (!same && force && hasMode) {
+ truncate();
}
int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
@@ -195,12 +195,12 @@
} else {
return true;
}
- return false;
+ return false;
}
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
- if(dupKey) db.set_flags(DB_DUPSORT);
+ if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
@@ -230,43 +230,43 @@
}
txn->commit(0);
- try{
+ try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
}
- catch (const journal::jexception& e) {
+ catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
}
}
void BdbMessageStore::create(PersistableQueue& queue)
{
- checkInit();
+ checkInit();
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
- // init will create the deque's for the init...
- jQueue->initialize();
- } catch (const journal::jexception& e) {
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize();
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
}
try {
if (!create(queueDb, queueIdSequence, queue)) {
- THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+ THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
} catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
+ THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
}
}
void BdbMessageStore::destroy(PersistableQueue& queue)
{
- checkInit();
+ checkInit();
destroy(queueDb, queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs)
@@ -280,7 +280,7 @@
void BdbMessageStore::create(const PersistableExchange& exchange)
{
- checkInit();
+ checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
@@ -296,7 +296,7 @@
void BdbMessageStore::destroy(const PersistableExchange& exchange)
{
- checkInit();
+ checkInit();
destroy(exchangeDb, exchange);
//need to also delete bindings
IdDbt key(exchange.getPersistenceId());
@@ -326,9 +326,9 @@
void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+ const std::string& k, const FieldTable& a)
{
- checkInit();
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
@@ -338,9 +338,9 @@
}
void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+ const std::string& k, const FieldTable& a)
{
- checkInit();
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
@@ -357,7 +357,7 @@
void BdbMessageStore::recover(RecoveryManager& registry)
{
- checkInit();
+ checkInit();
txn_list prepared;
recoverXids(prepared);
@@ -384,17 +384,17 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->dequeue(queues[j->first], messages[j->second]);
}
}
@@ -403,7 +403,7 @@
}
void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
-prepared, message_index& messages)
+ prepared, message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -423,22 +423,22 @@
if (usingJrnl())
{
- const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
- queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ const char* queueName = queue->getName().c_str();
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
+ queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- u_int64_t thisHighestRid = 0;
- jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recover_complete(); // start journal.
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
- }
- //read all messages: done on a per queue basis if using Journal
+ try
+ {
+ u_int64_t thisHighestRid = 0;
+ jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recover_complete(); // start journal.
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
+ }
+ //read all messages: done on a per queue basis if using Journal
}
queue_index[key.id] = queue;
@@ -447,7 +447,7 @@
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
- if (!usingJrnl()) //read all messages:
+ if (!usingJrnl()) //read all messages:
recoverMessages(txn, registry, queue_index, prepared, messages);
}
@@ -508,15 +508,15 @@
// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+ qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- DataTokenImpl dtokp;
- size_t readSize = 0;
- unsigned msg_count=0;
+ DataTokenImpl dtokp;
+ size_t readSize = 0;
+ unsigned msg_count=0;
bool read = true;
void* dbuff = NULL; size_t dbuffSize = 0;
@@ -526,77 +526,77 @@
dtokp.set_wstate(DataTokenImpl::ENQ);
- // read the message from the Journal.
+ // read the message from the Journal.
try {
-//std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
+ //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
unsigned aio_sleep_cnt = 0;
- while (read) {
+ while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
- readSize = dtokp.dsize();
+ readSize = dtokp.dsize();
switch (res)
{
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- RecoverableMessage::shared_ptr msg;
- char* data = (char*)dbuff;
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
- unsigned headerSize;
- if (externalFlag){
- msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
- } else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- msg = recovery.recoverMessage(headerBuff);
- }
- msg->setPersistenceId(dtokp.rid());
+ unsigned headerSize;
+ if (externalFlag){
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
+ msg->setPersistenceId(dtokp.rid());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize) && !externalFlag) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize) && !externalFlag) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
- if (xidbuff)
- ::free(xidbuff);
- else if (dbuff)
- ::free(dbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
- default:
- assert( "Store Error: Unexpected msg state");
+ if (xidbuff)
+ ::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
- ": recoverMessages() failed: " + e.what());
- }
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
+ ": recoverMessages() failed: " + e.what());
+ }
}
RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t messageId, unsigned& headerSize)
+ uint64_t messageId, unsigned& headerSize)
{
Dbt key (&messageId, sizeof(messageId));
size_t preamble_length = sizeof(u_int32_t)/*header size*/;
@@ -604,17 +604,17 @@
BufferValue value(preamble_length, 0);
value.buffer.record();
if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
- //read header only to begin with
+ //read header only to begin with
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
- return recovery.recoverMessage(header.buffer);
+ return recovery.recoverMessage(header.buffer);
}
@@ -695,34 +695,34 @@
std::set<string> prepared;
collectPreparedXids(prepared);
- //when using the async journal, it will abort unprepaired xids and populate the locked maps
- if (!usingJrnl()){
- txn_lock_map enqueues;
- txn_lock_map dequeues;
+ //when using the async journal, it will abort unprepaired xids and populate the locked maps
+ if (!usingJrnl()){
+ txn_lock_map enqueues;
+ txn_lock_map dequeues;
std::set<string> known;
readXids(enqueueXidDb, known);
readXids(dequeueXidDb, known);
//abort all known but unprepared xids:
for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
+ if (prepared.find(*i) == prepared.end()) {
TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb, false);
- }
+ completed(txn, dequeueXidDb, enqueueXidDb, false);
+ }
}
- readLockedMappings(enqueueXidDb, enqueues);
- readLockedMappings(dequeueXidDb, dequeues);
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
- } else {
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ readLockedMappings(enqueueXidDb, enqueues);
+ readLockedMappings(dequeueXidDb, dequeues);
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+ }
+ } else {
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
- }
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ }
}
}
@@ -751,12 +751,12 @@
std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
/*
- txn_lock_map::iterator i = mappings.find(xid);
- if (i == mappings.end()) {
- LockedMappings::shared_ptr ptr(new LockedMappings());
- i = mappings.insert(std::make_pair(xid, ptr)).first;
- }
- i->second->add(value.queueId(), value.messageId());
+ txn_lock_map::iterator i = mappings.find(xid);
+ if (i == mappings.end()) {
+ LockedMappings::shared_ptr ptr(new LockedMappings());
+ i = mappings.insert(std::make_pair(xid, ptr)).first;
+ }
+ i->second->add(value.queueId(), value.messageId());
*/
}
}
@@ -768,7 +768,7 @@
void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
{
- checkInit();
+ checkInit();
TxnCtxt txn;
txn.begin(env, true);
@@ -779,7 +779,7 @@
messageId = messageIdSequence.next();
store(NULL, &txn, key, msg, true);
msg->setPersistenceId(messageId);
- txn.commit();
+ txn.commit();
} catch (const std::exception& e) {
txn.abort();
throw;
@@ -788,7 +788,7 @@
}
void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
{
- checkInit();
+ checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
@@ -822,7 +822,7 @@
void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
- checkInit();
+ checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
@@ -850,9 +850,9 @@
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
{
- checkInit();
+ checkInit();
u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
@@ -860,10 +860,10 @@
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ){
- if (jc->loadMsgContent(messageId, data, realOffset, length)){
- return;
- }
- }
+ if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ return;
+ }
+ }
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
Dbt value(buffer, length);
@@ -873,17 +873,17 @@
value.set_dlen(length);
int status = messageDb.get(0, &key, &value, 0);
if (status == DB_NOTFOUND) {
- delete [] buffer;
+ delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
data.assign(buffer, value.get_size());
- delete [] buffer;
+ delete [] buffer;
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error loading content", e);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": loadContent() failed: " + e.what());
+ ": loadContent() failed: " + e.what());
}
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -893,21 +893,21 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
if (!usingJrnl()) return;
- checkInit();
+ checkInit();
try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc){
- jc->flush();
- }
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (jc){
+ jc->flush();
+ }
}catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
- }
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
+ }
}
void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+ const PersistableQueue& queue)
{
- checkInit();
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
@@ -931,24 +931,24 @@
if (messageId == 0) {
messageId = messageIdSequence.next();
msg->setPersistenceId(messageId);
- newId = true;
- }
+ newId = true;
+ }
store(&queue, txn, key, msg, newId);
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
- }else{
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
+ }else{
msg->enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
- // cct if using Journal do we need to wait for IO to complete before calling thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- }
- }
+ // cct if using Journal do we need to wait for IO to complete before calling thus???
+ // set enqueue comple on callback msg.enqueueComplete();
+ if (txn->isTPC()) {
+ record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
+ }
+ }
if (!ctxt) txn->commit();
} catch (const std::exception& e) {
@@ -958,27 +958,27 @@
}
void BdbMessageStore::store(const PersistableQueue* queue,
- TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
- bool newId)
+ TxnCtxt* txn, Dbt& messageId,
+ intrusive_ptr<PersistableMessage>& message,
+ bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
char* buff= 0;
- if (!message->isContentReleased() )
- {
- buff = static_cast<char*>(::alloca(size)); // long + headers + content
+ if (!message->isContentReleased() )
+ {
+ buff = static_cast<char*>(::alloca(size)); // long + headers + content
Buffer buffer(buff,size);
buffer.putLong(headerSize);
message->encode(buffer);
- }
+ }
try {
- if ( queue && usingJrnl()){
-//std::cout << "E" << std::flush;
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->ref();
+ if ( queue && usingJrnl()){
+ //std::cout << "E" << std::flush;
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
@@ -987,71 +987,71 @@
unsigned busy_sleep_cnt = 0;
while (!written)
{
- JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres;
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
- }else {
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
- }
- }else {
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
- } else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
- }
- }
+ JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+ rhm::journal::iores eres;
+ if (txn->getXid().empty()){
+ if (message->isContentReleased()){
+ eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ }else {
+ eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ }
+ }else {
+ if (message->isContentReleased()){
+ eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+ } else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+ }
+ }
switch (eres)
{
- case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
- if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
- written = true;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
- break;
- default:
- assert( "Store Error: Unexpected msg state");
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //std::cout << "." << std::flush;
+ if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+ written = true;
+ aio_sleep_cnt = 0;
+ busy_sleep_cnt = 0;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ //std::cout << "w" << std::flush;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+ usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+ jc->get_wr_events();
+ break;
+ case rhm::journal::RHM_IORES_BUSY:
+ //std::cout << "b" << std::flush;
+ if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ break;
+ case rhm::journal::RHM_IORES_FULL:
+ std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
}
}
- } else {
- /// cct message db
- if (newId){ // only store in Bd if first time message is stored
- Dbt data(buff,size);
- messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
- }
- }
+ } else {
+ /// cct message db
+ if (newId){ // only store in Bd if first time message is stored
+ Dbt data(buff,size);
+ messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
+ }
+ }
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
- e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
+ e.what());
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+ const PersistableQueue& queue)
{
- checkInit();
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (messageId == 0) {
@@ -1072,23 +1072,23 @@
try {
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
// added here as we are not doing it async on call back
- if (msg->isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
- {
- Dbt key (&messageId, sizeof(messageId));
+ if (msg->isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
+ {
+ Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
dequeue(txn->get(), key, value);
- }
+ }
- msg->dequeueComplete();
-// if ( msg->isDequeueComplete() ) // clear id after last dequeue
-// msg->setPersistenceId(0);
+ msg->dequeueComplete();
+ // if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ // msg->setPersistenceId(0);
- } else if (txn->isTPC()) {
+ } else if (txn->isTPC()) {
//if this is part of a 2pc transaction, then only record the dequeue now,
//it will be applied on commit
record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
@@ -1098,7 +1098,7 @@
if (dequeue(txn->get(), key, value)) {
msg->setPersistenceId(0);//clear id as we have now removed the message from the store
msg->dequeueComplete(); // set dequeued for ack
- }
+ }
}
if (!ctxt) txn->commit();
@@ -1111,62 +1111,64 @@
}
}
-void BdbMessageStore::async_dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(
+ TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
-//std::cout << "D" << std::flush;
+ //std::cout << "D" << std::flush;
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
- ddtokp->ref();
- ddtokp->setSourceMessage(msg);
- ddtokp->set_rid(messageIdSequence.next());
- ddtokp->set_dequeue_rid(msg->getPersistenceId());
- ddtokp->set_wstate(DataTokenImpl::ENQ);
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- string tid;
+ ddtokp->addRef();
+ ddtokp->setSourceMessage(msg);
+ ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_dequeue_rid(msg->getPersistenceId());
+ ddtokp->set_wstate(DataTokenImpl::ENQ);
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ string tid;
if (ctxt){
- TxnCtxt* txn = check(ctxt);
- tid = txn->getXid();
- }
+ TxnCtxt* txn = check(ctxt);
+ tid = txn->getXid();
+ }
unsigned aio_sleep_cnt = 0;
unsigned busy_sleep_cnt = 0;
while (!written)
{
- rhm::journal::iores dres;
- try {
- if (tid.empty()){
- dres = jc->dequeue_data_record(ddtokp.get());
- } else {
- dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
- }
- switch (dres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- written = true;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- default:
- assert( "Store Error: Unexpected msg state");
- }
+ rhm::journal::iores dres;
+ try {
+ if (tid.empty()){
+ dres = jc->dequeue_data_record(ddtokp.get());
+ } else {
+ dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+ }
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //std::cout << "." << std::flush;
+ aio_sleep_cnt = 0;
+ busy_sleep_cnt = 0;
+ written = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ //std::cout << "w" << std::flush;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ jc->get_wr_events();
+ break;
+ case rhm::journal::RHM_IORES_BUSY:
+ //std::cout << "b" << std::flush;
+ if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
}
}
@@ -1215,7 +1217,7 @@
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
- checkInit();
+ checkInit();
return 0;
}
@@ -1285,8 +1287,8 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
- checkInit();
- // pass sequence number for c/a when using jrnl
+ checkInit();
+ // pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
txn->begin(env, !usingJrnl());
return auto_ptr<TransactionContext>(txn);
@@ -1294,11 +1296,11 @@
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
{
- checkInit();
- IdSequence* jtx = NULL;
- if (usingJrnl()) jtx = &messageIdSequence;
+ checkInit();
+ IdSequence* jtx = NULL;
+ if (usingJrnl()) jtx = &messageIdSequence;
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
txn->begin(env, !usingJrnl());
return auto_ptr<TPCTransactionContext>(txn);
@@ -1306,7 +1308,7 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
- checkInit();
+ checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
@@ -1316,8 +1318,8 @@
Dbt key ((void*) xid.data(), xid.length());
Dbt value(&dummy, sizeof(dummy));
- // make sure all the data is written to disk before returning
- txn->sync();
+ // make sure all the data is written to disk before returning
+ txn->sync();
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
@@ -1329,8 +1331,8 @@
void BdbMessageStore::commit(TransactionContext& ctxt)
{
- checkInit();
- TxnCtxt* txn(check(&ctxt));
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
} else {
@@ -1340,7 +1342,7 @@
void BdbMessageStore::abort(TransactionContext& ctxt)
{
- checkInit();
+ checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -29,10 +29,13 @@
using namespace rhm::bdbstore;
using namespace rhm::journal;
-qpid::broker::TimerA JournalImpl::journalTimer;
+qpid::broker::Timer JournalImpl::journalTimer;
void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() { if (parent) parent->getEventsFire(); unref(); }
+void GetEventsFireEvent::fire() {
+ if (parent) parent->getEventsFire();
+ release();
+}
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -244,7 +247,7 @@
{
jcntl::flush();
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
- intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+ getEventsFireEventsPtr->addRef();
journalTimer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
@@ -265,7 +268,7 @@
}
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) {
- intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+ getEventsFireEventsPtr->addRef();
journalTimer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-06 18:24:12 UTC (rev 1437)
@@ -38,25 +38,25 @@
class JournalImpl;
- class InactivityFireEvent : public virtual qpid::broker::TimerTaskA
+ class InactivityFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTaskA(timeout), parent(p) {}
+ qpid::broker::TimerTask(timeout), parent(p) {}
virtual ~InactivityFireEvent() {}
void fire();
inline void cancel() { parent=0; }
};
- class GetEventsFireEvent : public virtual qpid::broker::TimerTaskA
+ class GetEventsFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTaskA(timeout), parent(p) {}
+ qpid::broker::TimerTask(timeout), parent(p) {}
virtual ~GetEventsFireEvent() {}
void fire();
inline void cancel() { parent=0; }
@@ -65,14 +65,14 @@
class JournalImpl : public journal::jcntl
{
private:
- static qpid::broker::TimerA journalTimer;
+ static qpid::broker::Timer journalTimer;
bool getEventsTimerSetFlag;
- qpid::broker::TimerTaskA::intrusive_ptr getEventsFireEventsPtr;
+ qpid::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
bool writeActivityFlag;
bool flushTriggeredFlag;
- qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+ qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
// temp local vars for loadMsgContent below
void* _xidp;
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-12-06 18:24:12 UTC (rev 1437)
@@ -72,7 +72,7 @@
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->ref();
+ dtokp->addRef();
dtokp->set_rid(loggedtx->next());
try{
if (commit)
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -41,25 +41,11 @@
namespace journal
{
-void intrusive_ptr_add_ref(data_tok* tok)
-{
- tok->ref();
-}
-
-void intrusive_ptr_release(data_tok* tok)
-{
- tok->unref();
- if (tok->refcnt() == 0)
- delete tok;
-}
-
-
// Static members
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
- _ref_cnt(0),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -33,6 +33,8 @@
#ifndef rhm_journal_data_tok_hpp
#define rhm_journal_data_tok_hpp
+#include <qpid/RefCounted.h>
+
namespace rhm
{
namespace journal
@@ -66,7 +68,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
- class data_tok
+class data_tok : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
@@ -102,7 +104,6 @@
};
private:
- size_t _ref_cnt; ///< Ref count for auto cleanup
pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
@@ -121,9 +122,6 @@
data_tok();
virtual ~data_tok();
- inline size_t refcnt(void) { return _ref_cnt;}
- inline void ref(void) { _ref_cnt++; }
- inline void unref(void) { _ref_cnt--; }
inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
{ return _sourceMsg; }
inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
@@ -171,10 +169,6 @@
void reset();
};
-
- void intrusive_ptr_add_ref(data_tok* r);
- void intrusive_ptr_release(data_tok* r);
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,34 +1,34 @@
/**
-* \file jcntl.cpp
-*
-* Red Hat Messaging - Message Journal
-*
-* Messaging journal top-level control and interface class
-* rhm::journal::jcntl. See comments in file jcntl.hpp for details.
-*
-* \author Kim van der Riet
-*
-* Copyright 2007 Red Hat, Inc.
-*
-* This file is part of Red Hat Messaging.
-*
-* Red Hat Messaging is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License as published by the Free Software Foundation; either
-* version 2.1 of the License, or (at your option) any later version.
-*
-* This library is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this library; if not, write to the Free Software
-* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-* USA
-*
-* The GNU Lesser General Public License is available in the file COPYING.
-*/
+ * \file jcntl.cpp
+ *
+ * Red Hat Messaging - Message Journal
+ *
+ * Messaging journal top-level control and interface class
+ * rhm::journal::jcntl. See comments in file jcntl.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright 2007 Red Hat, Inc.
+ *
+ * This file is part of Red Hat Messaging.
+ *
+ * Red Hat Messaging is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
#include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -90,7 +90,7 @@
void
jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
@@ -132,8 +132,8 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
- const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
@@ -145,8 +145,8 @@
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
-// Debug info, but may be useful to print with a flag
-//_rcvdat.print(_jid);
+ // Debug info, but may be useful to print with a flag
+ //_rcvdat.print(_jid);
if (_datafh)
{
@@ -190,7 +190,7 @@
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
-//std::cout << "Journal revovery complete." << std::endl;
+ //std::cout << "Journal revovery complete." << std::endl;
}
void
@@ -203,8 +203,8 @@
const iores
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_data_record");
@@ -212,7 +212,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
- false);
+ false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -221,7 +221,7 @@
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_data_record");
@@ -237,8 +237,8 @@
const iores
jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_tx_data_record");
@@ -246,7 +246,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
+ transient, false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -255,7 +255,7 @@
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +271,7 @@
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception)
+ const void** const data, bool auto_discard) throw (jexception)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +286,7 @@
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+ bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
{
check_rstatus("read_data");
return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +466,14 @@
_num_jfiles = ji.num_jfiles();
_rcvdat._enq_cnt_list.resize(_num_jfiles);
std::cout << "WARNING: Recovery found " << _num_jfiles <<
- " files (different from --num-jfiles parameter value)." << std::endl;
+ " files (different from --num-jfiles parameter value)." << std::endl;
}
if (_jfsize_sblks != ji.jfsize_sblks())
{
_jfsize_sblks = ji.jfsize_sblks();
std::cout << "WARNING: Recovery found file size = " <<
- (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
- " (different from --jfile-size-pgs parameter value)." << std::endl;
+ (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+ " (different from --jfile-size-pgs parameter value)." << std::endl;
}
try
@@ -499,16 +499,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
- {
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
- }
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+ itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), *itr);
+ if (pitr == prep_txn_list.end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
@@ -525,157 +525,157 @@
ifsp->read((char*)&h, sizeof(hdr));
switch(h._magic)
{
- case RHM_JDAT_ENQ_MAGIC:
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ enq_rec er;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = er.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- enq_rec er;
- while (!done)
- {
- done = er.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
- }
- if (!er.is_transient()) // Ignore transient msgs
- {
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
- {
- er.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- ::free(xidp);
- }
- else
- _emap.insert_fid(h._rid, fid);
- }
}
- break;
- case RHM_JDAT_DEQ_MAGIC:
+ if (!er.is_transient()) // Ignore transient msgs
{
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- deq_rec dr;
- while (!done)
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
{
- done = dr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
- }
- if (dr.xid_size())
- {
- // If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(dr.deq_rid()); }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- dr.get_xid(&xidp);
+ er.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
::free(xidp);
}
else
- {
- try
- {
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
- rd._enq_cnt_list[enq_fid]--;
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- }
+ _emap.insert_fid(h._rid, fid);
}
- break;
- case RHM_JDAT_TXA_MAGIC:
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ deq_rec dr;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = dr.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- txn_rec ar;
- while (!done)
+ }
+ if (dr.xid_size())
+ {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(dr.deq_rid()); }
+ catch(const jexception& e)
{
- done = ar.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
- // Delete this txn from tmap, unlock any locked records in emap
- ar.get_xid(&xidp);
+ dr.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ std::string xid((char*)xidp, dr.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ ::free(xidp);
+ }
+ else
+ {
+ try
{
- try
- {
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid);
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_fid]--;
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+ rd._enq_cnt_list[enq_fid]--;
}
- ::free(xidp);
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
}
- break;
- case RHM_JDAT_TXC_MAGIC:
+ }
+ break;
+ case RHM_JDAT_TXA_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ txn_rec ar;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- txn_rec cr;
- while (!done)
- {
- done = cr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ }
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
}
- // Delete this txn from tmap, process records into emap
- cr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ catch(const jexception& e)
{
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
- {
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
- rd._enq_cnt_list[fid]--;
- }
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
- ::free(xidp);
+ if (itr->_enq_flag)
+ rd._enq_cnt_list[itr->_fid]--;
}
- break;
- case RHM_JDAT_EMPTY_MAGIC:
+ ::free(xidp);
+ }
+ break;
+ case RHM_JDAT_TXC_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ txn_rec cr;
+ while (!done)
{
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+ done = cr.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
+ return false;
}
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
+ }
+ }
+ ::free(xidp);
+ }
break;
- case 0:
+ case RHM_JDAT_EMPTY_MAGIC:
+ {
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+ }
+ break;
+ case 0:
+ rd._lfid = fid;
+ rd._eo = ifsp->tellg();
+ return false;
+ default:
+ // Is this the last file, if so, stop as this is the overwrite boundary.
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+ {
rd._lfid = fid;
- rd._eo = ifsp->tellg();
+ rd._eo = read_pos;
return false;
- default:
- // Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
- {
- rd._lfid = fid;
- rd._eo = read_pos;
- return false;
- }
- std::stringstream ss;
- ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
- "rcvr_get_next_record");
+ }
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+ ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+ "rcvr_get_next_record");
}
return true;
@@ -747,7 +747,7 @@
ss << " foffs=0x" << std::setw(8) << read_pos;
ss << " expected_fid=0x" << std::setw(4) << expected_fid;
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
- "check_owi");
+ "check_owi");
}
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
@@ -757,33 +757,33 @@
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
-//kpvdr TODO -- this list needs to be mutexed...???
+ //kpvdr TODO -- this list needs to be mutexed...???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
+ journal->_aio_wr_cmpl_dtok_list.end());
journal->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
-*/
- break;
- default:
- ;
- }
- }
- intrusive_ptr_release(dtokp);
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+ /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+ */
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
}
@@ -792,21 +792,21 @@
jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
{
-//kpvdr TODO -- can we get rid of the copy???
+ //kpvdr TODO -- can we get rid of the copy???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
+ journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
// cct call the recovery manager. / lazyload..
- }
- }
- intrusive_ptr_release( dtokp);
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
More information about the rhmessaging-commits
mailing list