[rhmessaging-commits] rhmessaging commits: r1265 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 7 22:59:29 EST 2007


Author: kpvdr
Date: 2007-11-07 22:59:29 -0500 (Wed, 07 Nov 2007)
New Revision: 1265

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
Log:
Added temp JournalImpl::loadMsgContent() fn for holding last read msg from journal so that it can be read in parts.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-11-08 03:59:29 UTC (rev 1265)
@@ -22,6 +22,7 @@
 */
 
 #include "JournalImpl.h"
+
 #include "jrnl/jerrno.hpp"
 #include <qpid/sys/Monitor.h>
 
@@ -41,7 +42,11 @@
                          jcntl(journalId, journalDirectory, journalBaseFilename),
                          getEventsTimerSetFlag(false),
                          writeActivityFlag(false),
-                         flushTriggeredFlag(true)
+                         flushTriggeredFlag(true),
+                         _datap(0),
+                         _dlen(0),
+                         _dtok(),
+                         _external(false)
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -91,6 +96,64 @@
     }
 }
 
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000000
+const bool
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length)
+        throw (journal::jexception)
+{
+    if (_dtok.rid() != rid)
+    {
+        _datap = 0;
+        _dlen = 0;
+        _dtok.reset();
+        _dtok.set_rid(rid);
+        _dtok.set_wstate(journal::data_tok::ENQ);
+        _external = false;
+        void* xidp = 0;
+        size_t xlen = 0;
+        bool transient = false;
+        bool done = false;
+        unsigned aio_sleep_cnt = 0;
+        while (!done)
+        {
+            iores res = read_data_record(&_datap, _dlen, &xidp, xlen, transient, _external, &_dtok);
+            if (res == journal::RHM_IORES_SUCCESS) {
+                done = true;
+            } else if (res == journal::RHM_IORES_AIO_WAIT) {
+                if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+                    get_wr_events();
+                    usleep(AIO_SLEEP_TIME);
+                } else {
+                    std::stringstream ss;
+                    ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+                    ss << "; exceeded maximum wait time";
+                    throw jexception(ss.str());
+                }
+            } else {
+                std::stringstream ss;
+                ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+                throw jexception(ss.str());
+            }
+        }
+        // set correct pointer for deletion by boost::shared_ptr
+        if (xlen) {
+            _master = boost::shared_ptr<char>((char*)xidp);
+        } else {
+            _master = boost::shared_ptr<char>((char*)_datap);
+        }
+    }
+    if (_external)
+        return true;
+    if (offset + length > _dlen) {
+        std::stringstream ss;
+        ss << "loadMsgContent(): offset + length exceeds available message size";
+        throw jexception(ss.str());
+    }
+    data.append((const char*)_datap + offset, length);
+    return false;
+}
+
 const iores
 JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const bool transient)

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-11-08 03:59:29 UTC (rev 1265)
@@ -73,6 +73,13 @@
             bool writeActivityFlag;
             bool flushTriggeredFlag;
             qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+            
+            // temp local vars for loadMsgContent below
+            boost::shared_ptr<char> _master;
+            void* _datap;
+            size_t _dlen;
+            journal::data_tok _dtok;
+            bool _external;
 
         public:
             JournalImpl(const std::string& journalId,
@@ -93,6 +100,12 @@
                 recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
                     &aio_wr_callback, prep_tx_list, queue_id);
             }
+            
+            // Temporary fn to read and save last msg read from journal so it can be assigned
+            // in chunks. To be replaced when coding to do this direct from the journal is ready.
+            // Returns true if the record is extern, false if local.
+            const bool loadMsgContent(u_int64_t rid, std::string& data, size_t offset,
+                    size_t length) throw (journal::jexception);
 
             // Overrides for write inactivity timer
             const journal::iores enqueue_data_record(const void* const data_buff,




More information about the rhmessaging-commits mailing list