Author: cctrieloff
Date: 2007-09-07 20:22:36 -0400 (Fri, 07 Sep 2007)
New Revision: 913
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/StoreException.h
store/trunk/cpp/lib/jrnl/aio_cb.hpp
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jdir.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/JournalTest.cpp
store/trunk/cpp/tests/jrnl/Makefile.am
store/trunk/cpp/tests/jrnl/jtest.cpp
store/trunk/cpp/tests/jrnl/jtest.hpp
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_consumer.hpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.hpp
Log:
- added create / delete for async journal
- added enqueue logic
- integration updates with async journal
- still builds and test using bdb as defualt - set by runtime flag
- async callback handlers implented.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -30,6 +30,7 @@
#include "BindingDbt.h"
#include "IdPairDbt.h"
#include "StringDbt.h"
+#include <jrnl/jcntl.hpp>
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -111,24 +112,51 @@
txn->commit(0);
}
-void BdbMessageStore::create(const PersistableQueue& queue)
+void BdbMessageStore::create(PersistableQueue& queue)
{
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
+
+
+ if (usingJrnl())
+ {
+ journal::jcntl* jQueue = new journal::jcntl(queue.getName(), getJrnlDir(queue),
string("JournalData"));
+ queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+
+ try
+ {
+ // init will create the deque's for the init...
+ jQueue->initialize();
+
+ } catch (journal::jexception& e) {
+ std::string s;
+ THROW_STORE_EXCEPTION(e.to_string(s) + queue.getName());
+ }
+ }
+
try {
if (!create(queueDb, queueIdSequence, queue)) {
- THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+ THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
} catch (DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating queue named " +
queue.getName(), e);
+ THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(),
e);
}
}
-void BdbMessageStore::destroy(const PersistableQueue& queue)
+void BdbMessageStore::destroy(PersistableQueue& queue)
{
destroy(queueDb, queue);
+ qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
+ journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
+ if (jQueue)
+ {
+ jQueue->delete_jrnl();
+ delete(jQueue);
+ queue.setExternalQueueStore(NULL);
+ }
+
}
void BdbMessageStore::create(const PersistableExchange& exchange)
@@ -472,7 +500,7 @@
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
- store(txn.get(), key, msg);
+ store(NULL, txn.get(), key, msg);
msg.setPersistenceId(messageId);
txn.commit();
} catch (std::exception& e) {
@@ -592,14 +620,18 @@
try {
if (messageId == 0) {
messageId = messageIdSequence.next();
- store(txn->get(), key, msg);
+ store(&queue, txn->get(), key, msg);
msg.setPersistenceId(messageId);
- msg.enqueueComplete(); // set enqueued for ack
+ if (!usingJrnl())
+ msg.enqueueComplete(); // set enqueued for ack
}
- /// cct mapping db
- put(mappingDb, txn->get(), key, value);
- if (txn->isTPC()) {
+ if (!usingJrnl())
+ put(mappingDb, txn->get(), key, value);
+
+ // cct if using Journal do we need to wait for IO to complete before calling thus???
+ // set enqueue comple on callback msg.enqueueComplete();
+ if (txn->isTPC()) {
record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn),
messageId, queueId);
}
@@ -610,21 +642,57 @@
}
}
-void BdbMessageStore::store(DbTxn* txn, Dbt& messageId, PersistableMessage&
message)
+void BdbMessageStore::store(const PersistableQueue* queue, DbTxn* txn, Dbt&
messageId, PersistableMessage& message)
{
u_int32_t headerSize = message.encodedHeaderSize();
- u_int32_t totalSize = message.encodedSize();
- char* buff = static_cast<char*>(::alloca(totalSize + sizeof(u_int32_t)/*header
length*/));
- Buffer buffer(buff, headerSize + sizeof(u_int32_t));
+ u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
+ char* buff = static_cast<char*>(::alloca(size)); // long + headers + content
+ Buffer buffer(buff,size);
buffer.putLong(headerSize);
message.encode(buffer);
//buffer.flip();
+
+ try {
- Dbt data(buff, totalSize + sizeof(u_int32_t));
- try {
-
- /// cct message db
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ if (queue && usingJrnl()){
+
+ // cct TODO -- delete this in the callback...
+ journal::data_tok* dtokp = new journal::data_tok();
+ dtokp->setSourceMessage (&message);
+ dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal
header (record-id)
+
+ unsigned aio_sleep_cnt = 0;
+ bool written = false;
+ while (!written)
+ {
+ journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
+ rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
+ switch (eres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ if (dtokp->wstate() >= rhm::journal::data_tok::ENQ_SUBM)
+ written = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " +
queue->getName());
+ usleep(AIO_SLEEP_TIME);
+ jc->get_wr_events();
+ aio_sleep_cnt++;
+ break;
+ case rhm::journal::RHM_IORES_FULL:
+ THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :"
+ queue->getName());
+ break;
+ default:
+ assert( "Unexpected msg state");
+ }
+ }
+ } else {
+ /// cct message db
+ Dbt data(buff,size);
+ messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ }
+
} catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
@@ -860,3 +928,18 @@
THROW_STORE_EXCEPTION("Deletion failed: " +
string(DbEnv::strerror(status)));
}
}
+
+string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for
exmaple /var/rhm/ + queueDir/
+{
+ std::stringstream dir;
+ dir << "/var/rhm/" ;
+ dir << std::setw(4);
+ dir << std::setfill('0');
+ dir << (atol(queue.getName().c_str())%20);
+ dir << "/" << queue.getName() << "/";
+ return dir.str();
+}
+
+
+
+
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-08 00:22:36 UTC (rev 913)
@@ -43,7 +43,11 @@
namespace rhm {
namespace bdbstore {
using std::string;
-
+
+ #define MAX_AIO_SLEEPS 300
+ #define AIO_SLEEP_TIME 1000
+
+
/**
* An implementation of the MessageStore interface based on Berkeley DB
*/
@@ -81,7 +85,7 @@
void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(DbTxn* txn, Dbt& messageId,
qpid::broker::PersistableMessage& message);
+ void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn, Dbt&
messageId, qpid::broker::PersistableMessage& message);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
@@ -95,6 +99,12 @@
void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt&
value);
void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
+
+
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
+ inline bool usingJrnl() {return false;} // make configurable
public:
BdbMessageStore(const char* envpath = 0);
@@ -102,8 +112,8 @@
void truncate();
- void create(const qpid::broker::PersistableQueue& queue);
- void destroy(const qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue);
+ void destroy(qpid::broker::PersistableQueue& queue);
void create(const qpid::broker::PersistableExchange& queue);
void destroy(const qpid::broker::PersistableExchange& queue);
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/Makefile.am 2007-09-08 00:22:36 UTC (rev 913)
@@ -72,6 +72,8 @@
jrnl/wmgr.hpp \
jrnl/wrfc.hpp
+
+
BUILT_SOURCES = db-inc.h
db-inc.h: Makefile.in
echo '#include <$(DB_CXX_HEADER_PREFIX)db_cxx.h>' > $@-t
Modified: store/trunk/cpp/lib/StoreException.h
===================================================================
--- store/trunk/cpp/lib/StoreException.h 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/StoreException.h 2007-09-08 00:22:36 UTC (rev 913)
@@ -37,11 +37,19 @@
const char* what() const throw() { return text.c_str(); }
};
+class StoreFullException : public StoreException
+{
+public:
+ StoreFullException(const std::string& _text) : StoreException(_text) {}
+ StoreFullException(const std::string& _text, DbException& cause) :
StoreException(_text, cause) {}
+ ~StoreFullException() throw() {}
+};
+
#define THROW_STORE_EXCEPTION(MESSAGE) throw
StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ %
__LINE__))
#define THROW_STORE_EXCEPTION_2(MESSAGE, EXCEPTION) throw
StoreException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__ %
__LINE__), EXCEPTION)
+#define THROW_STORE_FULL_EXCEPTION(MESSAGE) throw
StoreFullException(boost::str(boost::format("%s (%s:%d)") % (MESSAGE) % __FILE__
% __LINE__))
-
}}
#endif
Modified: store/trunk/cpp/lib/jrnl/aio_cb.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio_cb.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/aio_cb.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -37,10 +37,13 @@
{
namespace journal
{
+
+ class jcntl;
+
/**
* \brief Callback function pointer to be called when AIO events arrive.
*/
- typedef void (*aio_cb)(u_int32_t num_dtoks);
+ typedef void (*aio_cb)(jcntl* journal, u_int32_t num_dtoks);
}
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -50,7 +50,8 @@
_rstate(UNREAD),
_dsize(0),
_dblks_proc(0),
- _rid(0)
+ _rid(0),
+ _sourceMsg(NULL)
{}
data_tok::~data_tok()
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -41,11 +41,20 @@
}
}
+namespace qpid
+{
+namespace broker
+{
+class PersistableMessage;
+}
+}
+
#include <sys/types.h>
#include <jrnl/jexception.hpp>
namespace rhm
{
+
namespace journal
{
@@ -84,11 +93,15 @@
size_t _dsize; ///< Data size in bytes
u_int32_t _dblks_proc; ///< Data blocks read/written
u_int64_t _rid; ///< RID of data set by enqueue operation
+ qpid::broker::PersistableMessage* _sourceMsg; ///< Pointer back to source Message in
Broker
public:
data_tok();
~data_tok();
+ inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
+ inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
+
inline const u_int64_t id() const { return _icnt; }
inline const write_state wstate() const {return _wstate; }
const char* wstate_str() const;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -39,6 +39,7 @@
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
#include <sstream>
+#include <qpid/broker/PersistableMessage.h>
namespace rhm
{
@@ -58,8 +59,8 @@
_emap(),
_rrfc(),
_wrfc(),
- _rmgr(_emap, _rrfc),
- _wmgr(_emap, _wrfc)
+ _rmgr(this, _emap, _rrfc),
+ _wmgr(this, _emap, _wrfc)
{}
jcntl::~jcntl()
@@ -116,6 +117,18 @@
_init_flag = true;
}
+void
+jcntl::delete_jrnl() throw (jexception)
+{
+ stop();
+
+ // wait for AIO or issue cancel...
+
+ _jdir.delete_dir();
+
+}
+
+
const iores
jcntl::enqueue_data(const void* const dbuf, const size_t dlen, data_tok* const dtok)
throw (jexception)
@@ -190,5 +203,50 @@
of.close();
}
+void
+jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+
+//kpvdr TODO -- can we get rid of the copy???
+ std::deque<rhm::journal::data_tok*>
this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
+ journal->_aio_wr_cmpl_dtok_list.end());
+ journal->_aio_wr_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ data_tok::write_state st = dtokp->wstate();
+ if (st == data_tok::ENQ)
+ {
+ dtokp->getSourceMessage()->enqueueComplete();
+ /// cct --- if TPC work out what to do !!!
+ }
+ else if (dtokp->wstate() == data_tok::DEQ)
+ {
+ dtokp->getSourceMessage()->dequeueComplete();
+ }
+ this_dtok_list.pop_front();
+ }
+}
+
+void
+jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+//kpvdr TODO -- can we get rid of the copy???
+ std::deque<rhm::journal::data_tok*>
this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
+ journal->_aio_rd_cmpl_dtok_list.end());
+ journal->_aio_rd_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() ==
data_tok::READ)
+ {
+ // cct call the recovery manager. / lazyload..
+ }
+ this_dtok_list.pop_front();
+ }
+}
+
+
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -41,11 +41,13 @@
}
}
+#include <deque>
#include <jrnl/jdir.hpp>
#include <jrnl/lfh.hpp>
#include <jrnl/rmgr.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
+#include <qpid/broker/PersistableQueue.h>
namespace rhm
{
@@ -63,7 +65,7 @@
* which is used per data block written to the journal, and is used to track its
status through
* the AIO enqueue, read and dequeue process.
*/
- class jcntl
+ class jcntl : public qpid::broker::ExternalQueueStore
{
private:
/**
@@ -126,6 +128,9 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
+ std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///<
Internally mamanged deque
+ std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
+
public:
/**
* \brief Journal constructor.
@@ -152,6 +157,9 @@
* <b>NOTE: Any existing journal will be ignored by this
operation.</b> To use recover
* the data from an existing journal, use recover().
*
+ * <b>NOTE: if NULL is passed to the deque's they are created internally
and deleted intenally
+ * <b>NOTE: if NULL is passed to the callbacks internal ones will be used.
+ *
* \param rd_dtokl deque for storing data tokens retruning from read AIO
operations.
* \param rd_cb Function pointer to callback function for read operations. May be
NULL.
* \param wr_dtokl deque for storing data tokens retruning from enqueue and
dequeue (write)
@@ -159,9 +167,24 @@
* \param wr_cb Function pointer to callback function for write operations. May be
NULL.
*/
void initialize(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
- std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw
(jexception);
+ std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ void initialize() {
+ initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list, &aio_wr_callback );
+ }
+
/**
+ * \brief Delete the journal directory of files matching the base filename
+ * by moving them into a subdirectory
+ *
+ * Stops AIO, stop journal, deletes files.....
+ *
+ * \exception The directory handle could not be closed.
+ */
+ void delete_jrnl() throw (jexception);
+
+
+ /**
* \brief Enqueues (writes) data to the journal.
*
* \param dbuf Pointer to data to be written to journal.
@@ -265,6 +288,8 @@
inline const std::string& base_filename() const { return _base_filename; }
+
+
private:
/**
* \brief Check status of journal before allowing certain operations.
@@ -276,6 +301,17 @@
*/
void write_infofile() const throw (jexception);
+ /**
+ * Intenal callback write
+ */
+ static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+
+ /**
+ * Intenal callback write
+ */
+ static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
+
+
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -62,6 +62,14 @@
}
}
+void
+jdir::delete_dir() throw (jexception)
+{
+
+ clear_dir(false); // cct hack -- TODO kpvdr to do the real deal...
+}
+
+
void
jdir::clear_dir(const bool create_flag) throw (jexception)
{
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -88,6 +88,17 @@
void clear_dir(const bool create_flag = true) throw (jexception);
/**
+ * \brief Delete the journal directory of files matching the base filename
+ * by moving them into a subdirectory.
+ *
+ * \exception The journal directory could not be opened.
+ * \exception The move of files from the journal directory to the created backup
+ * directory failed.
+ * \exception The directory handle could not be closed.
+ */
+ void delete_dir() throw (jexception);
+
+ /**
* \brief Create bakup directory that is next in sequence.
*
* Search for directory using pattern "_basename.bak.XXXX" where XXXX is
a hexadecimal
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -73,9 +73,10 @@
const u_int32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
-pmgr::pmgr(enq_map& emap, const u_int32_t pagesize, const u_int16_t pages):
+pmgr::pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages):
_pagesize(pagesize),
_pages(pages),
+ _jc(jc),
_emap(emap),
_dtokl(NULL),
_page_base_ptr(NULL),
@@ -91,10 +92,11 @@
_cb(NULL)
{}
-pmgr::pmgr(enq_map& emap, const u_int32_t pagesize, const u_int16_t pages,
+pmgr::pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages,
std::deque<data_tok*>* const dtokl) throw (jexception):
_pagesize(pagesize),
_pages(pages),
+ _jc(jc),
_emap(emap),
_dtokl(dtokl),
_page_base_ptr(NULL),
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -38,6 +38,7 @@
namespace journal
{
class pmgr;
+ class jcntl;
}
}
@@ -109,6 +110,7 @@
static const u_int32_t _sblksize; ///< Disk softblock size
const u_int32_t _pagesize; ///< Size of page cache pages
const u_int16_t _pages; ///< Number of page cache pages
+ jcntl* _jc; ///< Pointer to journal controller
enq_map& _emap; ///< Ref to enqueue map
std::deque<data_tok*>* _dtokl; ///< Pointer to external data token
list
void* _page_base_ptr; ///< Base pointer to page memory
@@ -127,8 +129,8 @@
aio_cb _cb; ///< Callback function pointer for AIO events
public:
- pmgr(enq_map& emap, const u_int32_t pagesize, const u_int16_t pages);
- pmgr(enq_map& emap, const u_int32_t pagesize, const u_int16_t pages,
+ pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages);
+ pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages,
std::deque<data_tok*>* const dtokl) throw (jexception);
virtual ~pmgr();
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -42,14 +42,14 @@
namespace journal
{
-rmgr::rmgr(enq_map& emap, rrfc& rrfc):
- pmgr(emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
+rmgr::rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc):
+ pmgr(jc, emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
_rrfc(rrfc),
_hdr()
{}
-rmgr::rmgr(enq_map& emap, rrfc& rrfc, std::deque<data_tok*>* const dtokl)
throw (jexception):
- pmgr(emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES, dtokl),
+rmgr::rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>*
const dtokl) throw (jexception):
+ pmgr(jc, emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES, dtokl),
_rrfc(rrfc),
_hdr()
{}
@@ -168,7 +168,7 @@
// Perform AIO return callback
if (_cb)
- (_cb)(s);
+ (_cb)(_jc, s);
}
return tot_data_toks;
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -62,8 +62,8 @@
hdr _hdr; ///< Header used to determind record type
public:
- rmgr(enq_map& emap, rrfc& rrfc);
- rmgr(enq_map& emap, rrfc& rrfc, std::deque<data_tok*>* const dtokl)
throw (jexception);
+ rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc);
+ rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>*
const dtokl) throw (jexception);
~rmgr();
void initialize(std::deque<data_tok*>* const dtokl, const aio_cb rd_cb)
throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -42,8 +42,8 @@
namespace journal
{
-wmgr::wmgr(enq_map& emap, wrfc& wrfc):
- pmgr(emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES),
+wmgr::wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc):
+ pmgr(jc, emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES),
_wrfc(wrfc),
_max_dtokpp(0),
_max_io_wait_us(0),
@@ -52,9 +52,9 @@
_enq_busy(false)
{}
-wmgr::wmgr(enq_map& emap, wrfc& wrfc, std::deque<data_tok*>* const dtokl,
+wmgr::wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc, std::deque<data_tok*>*
const dtokl,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception):
- pmgr(emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, dtokl),
+ pmgr(jc, emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, dtokl),
_wrfc(wrfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
@@ -105,7 +105,12 @@
else
_enq_busy = true;
- u_int64_t rid = cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
+ u_int64_t rid;
+ if (dtok->getSourceMessage())
+ rid = dtok->rid();
+ else
+ rid = cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
+
_data_rec.reset(rid, mbuf, dlen);
if (!cont)
dtok->set_rid(rid);
@@ -388,7 +393,7 @@
// Perform AIO return callback
if (_cb && tot_data_toks)
- (_cb)(tot_data_toks);
+ (_cb)(_jc, tot_data_toks);
return tot_data_toks;
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -79,8 +79,8 @@
bool _enq_busy; ///< Flag true if enqueue is in progress
public:
- wmgr(enq_map& emap, wrfc& wrfc);
- wmgr(enq_map& emap, wrfc& wrfc, std::deque<data_tok*>* const
dtokl,
+ wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc);
+ wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc, std::deque<data_tok*>*
const dtokl,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw
(jexception);
~wmgr();
Modified: store/trunk/cpp/tests/jrnl/JournalTest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -63,7 +63,7 @@
CPPUNIT_TEST(Test_023);
CPPUNIT_TEST(Test_024);
CPPUNIT_TEST(Test_025);
- CPPUNIT_TEST(Test_026);
+ CPPUNIT_TEST(Test_026);
CPPUNIT_TEST_SUITE_END();
jtest t;
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-08 00:22:36 UTC (rev 913)
@@ -21,7 +21,7 @@
abs_builddir=@abs_builddir@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) -pthread -DRHM_CLEAN -DRHM_WRONLY
-DRHM_TESTVALS
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread -DRHM_CLEAN
-DRHM_WRONLY -DRHM_TESTVALS
INCLUDES=-I../../lib
Modified: store/trunk/cpp/tests/jrnl/jtest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/jtest.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -219,9 +219,9 @@
// static method
void
-jtest::mp_aio_cb(u_int32_t num_dtoks)
+jtest::mp_aio_cb(rhm::journal::jcntl* jc, u_int32_t num_dtoks)
{
- jtest::_mp.aio_callback(num_dtoks);
+ jtest::_mp.aio_callback(jc, num_dtoks);
}
// static method
Modified: store/trunk/cpp/tests/jrnl/jtest.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/jtest.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -97,7 +97,7 @@
string& report_time(string& str) const;
// AIO callback functions
- static void mp_aio_cb(u_int32_t num_dtoks);
+ static void mp_aio_cb(rhm::journal::jcntl* jc, u_int32_t num_dtoks);
};
#endif
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -182,7 +182,7 @@
}
void
-msg_consumer::aio_callback(u_int32_t num_dtoks)
+msg_consumer::aio_callback(rhm::journal::jcntl* /*jc*/, u_int32_t num_dtoks)
{
std::cout << "msg_consumer::aio_cb(" << num_dtoks <<
")" << std::endl;
}
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -76,7 +76,7 @@
u_int32_t consume(rhm::journal::jcntl& _jcntl, const u_int32_t numMsgs,
const size_t min_msg_size, const size_t max_msg_size) throw
(rhm::journal::jexception);
u_int32_t consume(_c_args* args) throw (rhm::journal::jexception);
- void aio_callback(u_int32_t num_dtoks);
+ void aio_callback(rhm::journal::jcntl* jc, u_int32_t num_dtoks);
const inline u_int64_t get_tot_dblks() const { return _tot_dblks; }
const inline u_int64_t get_tot_dsize() const { return _tot_dsize; }
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -216,7 +216,7 @@
}
void
-msg_producer::aio_callback(u_int32_t num_dtoks)
+msg_producer::aio_callback(rhm::journal::jcntl* /*jc*/, u_int32_t num_dtoks)
{
assert(num_dtoks == _aio_cmpl_dtok_list.size());
std::deque<rhm::journal::data_tok*>
this_dtok_list(_aio_cmpl_dtok_list.begin(),
Modified: store/trunk/cpp/tests/jrnl/msg_producer.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-09-07 10:27:55 UTC (rev 912)
+++ store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-09-08 00:22:36 UTC (rev 913)
@@ -86,7 +86,7 @@
u_int32_t produce(rhm::journal::jcntl& jc, const size_t minMsgSize, const size_t
maxMsgSize)
throw (rhm::journal::jexception);
u_int32_t produce(_p_args* args) throw (rhm::journal::jexception);
- void aio_callback(u_int32_t num_dtoks);
+ void aio_callback(rhm::journal::jcntl* jc, u_int32_t num_dtoks);
const inline u_int64_t get_tot_dblks() const { return _tot_dblks; }
const inline u_int64_t get_tot_dsize() const { return _tot_dsize; }