[rhmessaging-commits] rhmessaging commits: r1481 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Dec 13 15:17:34 EST 2007


Author: kpvdr
Date: 2007-12-13 15:17:34 -0500 (Thu, 13 Dec 2007)
New Revision: 1481

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/DataTokenImpl.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/data_tok.cpp
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/slock.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Further tidy-up: additional decoupling bewteen the journal and qpid.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -31,6 +31,8 @@
 #include "BindingDbt.h"
 #include "IdPairDbt.h"
 #include "StringDbt.h"
+#include "JournalImpl.h"
+#include "DataTokenImpl.h"
 #include <boost/intrusive_ptr.hpp>
 
 using namespace rhm::bdbstore;
@@ -993,6 +995,7 @@
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
             dtokp->addRef();
 	        dtokp->setSourceMessage(message);
+            dtokp->set_external_rid(true);
 	        dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
 
             bool written = false;
@@ -1137,6 +1140,7 @@
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
     ddtokp->addRef();
     ddtokp->setSourceMessage(msg);
+    ddtokp->set_external_rid(true);
     ddtokp->set_rid(messageIdSequence.next()); 
     ddtokp->set_dequeue_rid(msg->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-12-13 20:17:34 UTC (rev 1481)
@@ -42,8 +42,6 @@
 #include <boost/format.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <boost/ptr_container/ptr_list.hpp>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
 
 namespace rhm {
     namespace bdbstore {

Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/DataTokenImpl.h	2007-12-13 20:17:34 UTC (rev 1481)
@@ -25,15 +25,24 @@
 #define _DataTokenImpl_
 
 #include "jrnl/data_tok.hpp"
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
 
 namespace rhm {
     namespace bdbstore {
 
-        class DataTokenImpl : public journal::data_tok
+        class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
         {
+        private:
+            boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
         public:
             DataTokenImpl();
             virtual ~DataTokenImpl();
+
+            inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+                    { return sourceMsg; }
+            inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+                    { sourceMsg = msg; }
         };
 
         } // namespace bdbstore

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -128,7 +128,7 @@
         _dlen = 0;
         _dtok.reset();
         _dtok.set_rid(rid);
-        _dtok.set_wstate(journal::data_tok::ENQ);
+        _dtok.set_wstate(DataTokenImpl::ENQ);
         _external = false;
         size_t xlen = 0;
         bool transient = false;
@@ -307,7 +307,7 @@
     jip->_aio_wr_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
-        data_tok*& dtokp = this_dtok_list.front();
+        DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
 		if (!journal->is_stopped() && dtokp->getSourceMessage())
 		{
 			switch (dtokp->wstate())
@@ -341,7 +341,7 @@
     jip->_aio_rd_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
-        data_tok*& dtokp = this_dtok_list.front();
+        DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
 		if (!journal->is_stopped() && dtokp->getSourceMessage())
 		{
         	if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-12-13 20:17:34 UTC (rev 1481)
@@ -26,9 +26,10 @@
 
 #include <set>
 #include "jrnl/jcntl.hpp"
-#include "jrnl/data_tok.hpp"
+#include "DataTokenImpl.h"
 #include "PreparedTransaction.h"
 #include <qpid/broker/Timer.h>
+#include <qpid/broker/PersistableQueue.h>
 #include <qpid/sys/Time.h>
 #include <boost/ptr_container/ptr_list.hpp>
 #include <boost/intrusive_ptr.hpp>
@@ -62,7 +63,7 @@
 		    inline void cancel() { parent=0; }
         };
 
-        class JournalImpl : public journal::jcntl 
+        class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
         {            
         private:
             static qpid::broker::Timer journalTimer;

Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -53,7 +53,7 @@
     _dblks_read(0),
     _rid(0),
     _xid(),
-    _sourceMsg(NULL)
+    _external_rid(false)
 {
     pthread_mutex_init(&_mutex, NULL);
     pthread_mutex_lock(&_mutex);

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -41,18 +41,8 @@
 }
 }
 
-namespace qpid
-{
-namespace broker 
-{
-class PersistableMessage;
-}
-}
-
-#include <boost/intrusive_ptr.hpp>
 #include <pthread.h>
-#include <qpid/broker/PersistableMessage.h>
-#include <qpid/RefCounted.h>
+#include <string>
 #include <sys/types.h>
 
 namespace rhm
@@ -66,7 +56,7 @@
     * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
     *     I/O process
     */
-    class data_tok : public qpid::RefCounted
+    class data_tok// : public qpid::RefCounted
     {
     public:
         // TODO: Fix this, separate write state from operation
@@ -101,7 +91,7 @@
             READ        ///< Data block is fully read
         };
 
-    private:
+    protected:
         pthread_mutex_t _mutex;
         static u_int64_t _cnt;
         u_int64_t   _icnt;
@@ -114,17 +104,12 @@
         u_int64_t   _rid;           ///< RID of data set by enqueue operation
         std::string _xid;           ///< XID set by enqueue operation
         u_int64_t   _dequeue_rid;   ///< RID of data set by dequeue operation
-        boost::intrusive_ptr<qpid::broker::PersistableMessage> _sourceMsg; ///< Pointer back to source Message in Broker
+        bool        _external_rid;  ///< Flag to indicate external setting of rid
 
     public:
         data_tok();
         virtual ~data_tok();
 
-        inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
-                { return _sourceMsg; }
-        inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
-                { _sourceMsg = msg; }
-
         inline const u_int64_t id() const { return _icnt; }
         inline const write_state wstate() const { return _wstate; }
         const char* wstate_str() const;
@@ -157,6 +142,8 @@
         inline void set_rid(const u_int64_t rid) { _rid = rid; }
         inline const u_int64_t dequeue_rid() const {return _dequeue_rid; }
         inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
+        inline const bool external_rid() const { return _external_rid; }
+        inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
 
         inline const bool has_xid() const { return !_xid.empty(); }
         inline const std::string& xid() const { return _xid; }

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,9 +34,11 @@
 #include <jrnl/jcntl.hpp>
 
 #include <algorithm>
+#include <assert.h>
 #include <cerrno>
 #include <fstream>
 #include <iomanip>
+#include <iostream>
 #include <jrnl/file_hdr.hpp>
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jinf.hpp>

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -65,7 +65,7 @@
     * which is used per data block written to the journal, and is used to track its status through
     * the AIO enqueue, read and dequeue process.
     */
-    class jcntl : public qpid::broker::ExternalQueueStore 
+    class jcntl
     {
     protected:
         /**

Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/slock.hpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,6 +34,7 @@
 #define rhm_journal_slock_hpp
 
 #include <pthread.h>
+#include <errno.h>
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jexception.hpp>
 

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -141,7 +141,8 @@
     else
         _enq_busy = true;
 
-    u_int64_t rid = initialize_rid(cont, dtokp);
+    u_int64_t rid = dtokp->external_rid() ? dtokp->rid() :
+            (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
     _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
             external);
     if (!cont)
@@ -280,13 +281,10 @@
         dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
     }
 
-    // TODO: Tidy this up!
-//    u_int64_t rid = initialize_rid(cont, dtokp);
-//    _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
-    u_int64_t rid = dtokp->getSourceMessage() ? dtokp->rid() :
-            (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
-    u_int64_t dequeue_rid = dtokp->getSourceMessage() ? dtokp->dequeue_rid() : dtokp->rid();
-	if (!dtokp->getSourceMessage())
+    const bool ext_rid = dtokp->external_rid();
+    u_int64_t rid = ext_rid ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
+    u_int64_t dequeue_rid = ext_rid ? dtokp->dequeue_rid() : dtokp->rid();
+	if (!ext_rid)
 	{
 		dtokp->set_rid(rid);
 		dtokp->set_dequeue_rid(dequeue_rid);
@@ -991,18 +989,6 @@
     return RHM_IORES_SUCCESS;
 }
 
-const u_int64_t
-wmgr::initialize_rid(const bool cont, data_tok* dtokp)
-{
-    if (dtokp->getSourceMessage())
-    {
-        u_int64_t rid = dtokp->rid();
-        assert(rid != 0);
-        return rid;
-    }
-    return cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
-}
-
 void
 wmgr::dblk_roundup()
 {

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-13 20:17:34 UTC (rev 1481)
@@ -121,7 +121,6 @@
         const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
                 const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
                 const;
-        const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
         const iores write_flush();
         const iores rotate_file();
         void dblk_roundup();




More information about the rhmessaging-commits mailing list