[rhmessaging-commits] rhmessaging commits: r1481 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Dec 13 15:17:34 EST 2007
Author: kpvdr
Date: 2007-12-13 15:17:34 -0500 (Thu, 13 Dec 2007)
New Revision: 1481
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/DataTokenImpl.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/slock.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Further tidy-up: additional decoupling bewteen the journal and qpid.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -31,6 +31,8 @@
#include "BindingDbt.h"
#include "IdPairDbt.h"
#include "StringDbt.h"
+#include "JournalImpl.h"
+#include "DataTokenImpl.h"
#include <boost/intrusive_ptr.hpp>
using namespace rhm::bdbstore;
@@ -993,6 +995,7 @@
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
+ dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
bool written = false;
@@ -1137,6 +1140,7 @@
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
+ ddtokp->set_external_rid(true);
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -42,8 +42,6 @@
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_list.hpp>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
namespace rhm {
namespace bdbstore {
Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/DataTokenImpl.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -25,15 +25,24 @@
#define _DataTokenImpl_
#include "jrnl/data_tok.hpp"
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
namespace rhm {
namespace bdbstore {
- class DataTokenImpl : public journal::data_tok
+ class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
{
+ private:
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
public:
DataTokenImpl();
virtual ~DataTokenImpl();
+
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+ { return sourceMsg; }
+ inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+ { sourceMsg = msg; }
};
} // namespace bdbstore
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -128,7 +128,7 @@
_dlen = 0;
_dtok.reset();
_dtok.set_rid(rid);
- _dtok.set_wstate(journal::data_tok::ENQ);
+ _dtok.set_wstate(DataTokenImpl::ENQ);
_external = false;
size_t xlen = 0;
bool transient = false;
@@ -307,7 +307,7 @@
jip->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
- data_tok*& dtokp = this_dtok_list.front();
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
switch (dtokp->wstate())
@@ -341,7 +341,7 @@
jip->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
- data_tok*& dtokp = this_dtok_list.front();
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -26,9 +26,10 @@
#include <set>
#include "jrnl/jcntl.hpp"
-#include "jrnl/data_tok.hpp"
+#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/Timer.h>
+#include <qpid/broker/PersistableQueue.h>
#include <qpid/sys/Time.h>
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -62,7 +63,7 @@
inline void cancel() { parent=0; }
};
- class JournalImpl : public journal::jcntl
+ class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
{
private:
static qpid::broker::Timer journalTimer;
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -53,7 +53,7 @@
_dblks_read(0),
_rid(0),
_xid(),
- _sourceMsg(NULL)
+ _external_rid(false)
{
pthread_mutex_init(&_mutex, NULL);
pthread_mutex_lock(&_mutex);
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -41,18 +41,8 @@
}
}
-namespace qpid
-{
-namespace broker
-{
-class PersistableMessage;
-}
-}
-
-#include <boost/intrusive_ptr.hpp>
#include <pthread.h>
-#include <qpid/broker/PersistableMessage.h>
-#include <qpid/RefCounted.h>
+#include <string>
#include <sys/types.h>
namespace rhm
@@ -66,7 +56,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
- class data_tok : public qpid::RefCounted
+ class data_tok// : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
@@ -101,7 +91,7 @@
READ ///< Data block is fully read
};
- private:
+ protected:
pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
@@ -114,17 +104,12 @@
u_int64_t _rid; ///< RID of data set by enqueue operation
std::string _xid; ///< XID set by enqueue operation
u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
- boost::intrusive_ptr<qpid::broker::PersistableMessage> _sourceMsg; ///< Pointer back to source Message in Broker
+ bool _external_rid; ///< Flag to indicate external setting of rid
public:
data_tok();
virtual ~data_tok();
- inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
- { return _sourceMsg; }
- inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
- { _sourceMsg = msg; }
-
inline const u_int64_t id() const { return _icnt; }
inline const write_state wstate() const { return _wstate; }
const char* wstate_str() const;
@@ -157,6 +142,8 @@
inline void set_rid(const u_int64_t rid) { _rid = rid; }
inline const u_int64_t dequeue_rid() const {return _dequeue_rid; }
inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
+ inline const bool external_rid() const { return _external_rid; }
+ inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
inline const bool has_xid() const { return !_xid.empty(); }
inline const std::string& xid() const { return _xid; }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,9 +34,11 @@
#include <jrnl/jcntl.hpp>
#include <algorithm>
+#include <assert.h>
#include <cerrno>
#include <fstream>
#include <iomanip>
+#include <iostream>
#include <jrnl/file_hdr.hpp>
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -65,7 +65,7 @@
* which is used per data block written to the journal, and is used to track its status through
* the AIO enqueue, read and dequeue process.
*/
- class jcntl : public qpid::broker::ExternalQueueStore
+ class jcntl
{
protected:
/**
Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,6 +34,7 @@
#define rhm_journal_slock_hpp
#include <pthread.h>
+#include <errno.h>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -141,7 +141,8 @@
else
_enq_busy = true;
- u_int64_t rid = initialize_rid(cont, dtokp);
+ u_int64_t rid = dtokp->external_rid() ? dtokp->rid() :
+ (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
external);
if (!cont)
@@ -280,13 +281,10 @@
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
}
- // TODO: Tidy this up!
-// u_int64_t rid = initialize_rid(cont, dtokp);
-// _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
- u_int64_t rid = dtokp->getSourceMessage() ? dtokp->rid() :
- (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
- u_int64_t dequeue_rid = dtokp->getSourceMessage() ? dtokp->dequeue_rid() : dtokp->rid();
- if (!dtokp->getSourceMessage())
+ const bool ext_rid = dtokp->external_rid();
+ u_int64_t rid = ext_rid ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
+ u_int64_t dequeue_rid = ext_rid ? dtokp->dequeue_rid() : dtokp->rid();
+ if (!ext_rid)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(dequeue_rid);
@@ -991,18 +989,6 @@
return RHM_IORES_SUCCESS;
}
-const u_int64_t
-wmgr::initialize_rid(const bool cont, data_tok* dtokp)
-{
- if (dtokp->getSourceMessage())
- {
- u_int64_t rid = dtokp->rid();
- assert(rid != 0);
- return rid;
- }
- return cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
-}
-
void
wmgr::dblk_roundup()
{
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -121,7 +121,6 @@
const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
const;
- const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
More information about the rhmessaging-commits
mailing list