[rhmessaging-commits] rhmessaging commits: r1771 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Mar 17 17:11:20 EDT 2008


Author: kpvdr
Date: 2008-03-17 17:11:19 -0400 (Mon, 17 Mar 2008)
New Revision: 1771

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/jrnl/enums.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/nlfh.cpp
   store/trunk/cpp/lib/jrnl/nlfh.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/lib/jrnl/wrfc.hpp
   store/trunk/cpp/tests/jrnl/_st_basic.cpp
   store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Bugfix for BZ437880: Small total message journal capacity can cause premature file overwrite. Added an outstanding aio count to class nlfh and an additional aio return enumeration to cover this case. Modified the return handler logic in class jcntl to handle this new return value.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -579,16 +579,16 @@
                   aio_sleep_cnt = 0;
                   break;
               }
-              case rhm::journal::RHM_IORES_AIO_WAIT:
-                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                    THROW_STORE_EXCEPTION("Timeout waiting for AIO");
-                ::usleep(AIO_SLEEP_TIME);
-                break;
+              case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                  if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                      THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                  ::usleep(AIO_SLEEP_TIME);
+                  break;
               case rhm::journal::RHM_IORES_EMPTY:
-                read = false;
-                break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
+                  read = false;
+                  break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
               default:
-                assert( "Store Error: Unexpected msg state");
+                  assert( "Store Error: Unexpected msg state");
             } // switch
         } // while
     } catch (const journal::jexception& e) {

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -147,7 +147,7 @@
                     &_dtok);
             if (res == journal::RHM_IORES_SUCCESS) {
                 done = true;
-            } else if (res == journal::RHM_IORES_AIO_WAIT) {
+            } else if (res == journal::RHM_IORES_PAGE_AIOWAIT) {
                 if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
                     get_wr_events();
                     usleep(AIO_SLEEP_TIME);

Modified: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/enums.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -44,7 +44,8 @@
     enum _iores
     {
         RHM_IORES_SUCCESS = 0,  ///< Success: IO operation completed noramlly.
-        RHM_IORES_AIO_WAIT,     ///< IO operation suspended - all pages are waiting for AIO.
+        RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
+        RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
         RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
         RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
         RHM_IORES_FULL,         ///< During write operations, the journal files are full.
@@ -59,7 +60,8 @@
         switch (res)
         {
             case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
-            case RHM_IORES_AIO_WAIT: return "RHM_IORES_AIO_WAIT";
+            case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
+            case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
             case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
             case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
             case RHM_IORES_FULL: return "RHM_IORES_FULL";

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -217,7 +217,7 @@
         slock s(&_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0,
-                0, transient, false), r));
+                0, transient, false), r, dtokp));
         return r;
     }
 }
@@ -230,7 +230,7 @@
         slock s(&_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient,
-                true), r));
+                true), r, dtokp));
         return r;
     }
 }
@@ -245,7 +245,7 @@
         slock s(&_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp,
-                xid.data(), xid.size(), transient, false), r));
+                xid.data(), xid.size(), transient, false), r, dtokp));
         return r;
     }
 }
@@ -259,7 +259,7 @@
         slock s(&_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(),
-                transient, true), r));
+                transient, true), r, dtokp));
         return r;
     }
 }
@@ -294,7 +294,7 @@
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r));
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp));
         return r;
     }
 }
@@ -306,7 +306,7 @@
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r));
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp));
         return r;
     }
 }
@@ -318,7 +318,7 @@
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r));
+        while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp));
         return r;
     }
 }
@@ -330,7 +330,7 @@
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r));
+        while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp));
         return r;
     }
 }
@@ -438,12 +438,13 @@
 }
 
 const bool
-jcntl::handle_aio_wait(const iores res, iores& resout)
+jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
 {
+    // TODO: factor out the common while loops below into a common fn
+    u_int32_t cnt = 0;
     resout = res;
-    if (res == RHM_IORES_AIO_WAIT)
+    if (res == RHM_IORES_PAGE_AIOWAIT)
     {
-        u_int32_t cnt = 0;
         while (_wmgr.curr_pg_blocked())
         {
             _wmgr.get_events(pmgr::UNUSED);
@@ -457,6 +458,25 @@
         }
         return true;
     }
+    else if (res == RHM_IORES_FILE_AIOWAIT)
+    {
+        while (_wmgr.curr_file_blocked())
+        {
+            _wmgr.get_events(pmgr::UNUSED);
+            if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+            {
+                // TODO: Log this!
+                std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " << _wmgr.status_str() << std::endl;
+                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+            }
+            ::usleep(AIO_CMPL_SLEEP);
+        }
+        _wrfc.reset();
+        resout = RHM_IORES_SUCCESS;
+        data_tok::write_state ws = dtp->wstate();
+        return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART ||
+                ws == data_tok::COMMIT_PART;
+    }
     return false;
 }
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -605,7 +605,7 @@
         * \brief Call that blocks until at least one message returns; used to wait for
         *     AIO wait conditions to clear.
         */
-        const bool handle_aio_wait(const iores res, iores& resout);
+        const bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);
 
         /**
         * \brief Analyze journal for recovery.

Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -55,11 +55,12 @@
         _rd_cmpl_cnt_dblks(0),
 #ifdef RHM_RDONLY
         _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
-        _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
+        _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
 #else
         _wr_subm_cnt_dblks(0),
-        _wr_cmpl_cnt_dblks(0)
+        _wr_cmpl_cnt_dblks(0),
 #endif
+        _aio_cnt(0)
 {}
 
 nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
@@ -74,11 +75,12 @@
         _rd_cmpl_cnt_dblks(0),
 #ifdef RHM_RDONLY
         _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
-        _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
+        _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
 #else
         _wr_subm_cnt_dblks(0),
-        _wr_cmpl_cnt_dblks(0)
+        _wr_cmpl_cnt_dblks(0),
 #endif
+        _aio_cnt(0)
 {
     initialize(fbasename, fid, jfsize_sblks, ro);
     open_fh();
@@ -221,7 +223,8 @@
     }
 #ifndef RHM_WRONLY
     // Journal overflow test - checks if the file to be reset still contains enqueued records
-    if (_rec_enqcnt)
+    // or outstanding aios
+    if (_rec_enqcnt || _aio_cnt)
         return false;
 #endif
 #ifndef RHM_RDONLY
@@ -320,6 +323,15 @@
     return _wr_cmpl_cnt_dblks;
 }
 
+const u_int16_t
+nlfh::decr_aio_cnt()
+{
+    if(_aio_cnt == 0)
+        throw jexception(jerrno::JERR__UNDERFLOW, "Decremented _aio_cnt when already zero", "nlfh",
+                "decr_aio_cnt");
+    return --_aio_cnt;
+}
+
 // Debug function
 const std::string
 nlfh::status_str() const
@@ -327,6 +339,7 @@
     std::ostringstream oss;
     oss << "fid=" << _fid << " ws=" << _wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
     oss << " rs=" << _rd_subm_cnt_dblks << " rc=" << _rd_cmpl_cnt_dblks;
+    oss << " ec=" << _rec_enqcnt << " ac=" << _aio_cnt;
     return oss.str();
 }
 

Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -66,6 +66,7 @@
         u_int32_t _rd_cmpl_cnt_dblks;   ///< Read file count (data blocks) for completed AIO
         u_int32_t _wr_subm_cnt_dblks;   ///< Write file count (data blocks) for submitted AIO
         u_int32_t _wr_cmpl_cnt_dblks;   ///< Write file count (data blocks) for completed AIO
+        u_int16_t _aio_cnt;             ///< Outstanding AIO operations on this file
 
     public:
         nlfh(const u_int32_t jfsize_sblks);
@@ -104,6 +105,10 @@
         inline const size_t wr_cmpl_offs() const { return _wr_cmpl_cnt_dblks * JRNL_DBLK_SIZE; }
         const u_int32_t add_wr_cmpl_cnt_dblks(u_int32_t a);
 
+        inline const u_int16_t aio_cnt() const { return _aio_cnt; }
+        inline const u_int16_t incr_aio_cnt() { return ++_aio_cnt; }
+        const u_int16_t decr_aio_cnt();
+
         // Derived helper functions
 
         inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks == 0; }

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -91,7 +91,7 @@
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
             aio_cycle();
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         }
         void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
                 (_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -185,7 +185,7 @@
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
             aio_cycle();
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         }
         void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
                 (_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -225,7 +225,7 @@
     if (dtokp->rstate() == data_tok::SKIP_PART)
     {
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         const iores res = skip(dtokp);
         if (res != RHM_IORES_SUCCESS)
         {
@@ -256,7 +256,7 @@
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
             aio_cycle();
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         }
         void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
                 (_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -476,7 +476,7 @@
     if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
     {
         aio_cycle();   // check if any AIOs have returned
-        return RHM_IORES_AIO_WAIT;
+        return RHM_IORES_PAGE_AIOWAIT;
     }
 
     // Read data from this page, first block will have header and data size.
@@ -493,7 +493,7 @@
         {
             dtokp->set_rstate(data_tok::READ_PART);
             dtokp->set_dsize(_enq_rec.data_size());
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         }
 
         rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
@@ -584,7 +584,7 @@
             if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
             {
                 dtokp->set_rstate(data_tok::SKIP_PART);
-                return RHM_IORES_AIO_WAIT;
+                return RHM_IORES_PAGE_AIOWAIT;
             }
         }
         else

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -128,7 +128,7 @@
         return res;
 
     bool cont = false;
-    if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
+    if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
     {
         if (dtokp->wstate() == data_tok::ENQ_PART)
             cont = true;
@@ -223,7 +223,7 @@
 
             if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
             {
-                res = RHM_IORES_AIO_WAIT;
+                res = RHM_IORES_PAGE_AIOWAIT;
                 done = true;
             }
 
@@ -262,7 +262,7 @@
         return res;
 
     bool cont = false;
-    if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
+    if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
     {
         if (dtokp->wstate() == data_tok::DEQ_PART)
             cont = true;
@@ -366,7 +366,7 @@
 
             if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
             {
-                res = RHM_IORES_AIO_WAIT;
+                res = RHM_IORES_PAGE_AIOWAIT;
                 done = true;
             }
 
@@ -405,7 +405,7 @@
         return res;
 
     bool cont = false;
-    if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
+    if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
     {
         if (dtokp->wstate() == data_tok::ABORT_PART)
             cont = true;
@@ -507,7 +507,7 @@
 
             if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
             {
-                res = RHM_IORES_AIO_WAIT;
+                res = RHM_IORES_PAGE_AIOWAIT;
                 done = true;
             }
 
@@ -546,7 +546,7 @@
         return res;
 
     bool cont = false;
-    if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
+    if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
     {
         if (dtokp->wstate() == data_tok::COMMIT_PART)
             cont = true;
@@ -644,7 +644,7 @@
 
             if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
             {
-                res = RHM_IORES_AIO_WAIT;
+                res = RHM_IORES_PAGE_AIOWAIT;
                 done = true;
             }
 
@@ -690,7 +690,7 @@
     if (_cached_offset_dblks)
     {
         if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
-            res = RHM_IORES_AIO_WAIT;
+            res = RHM_IORES_PAGE_AIOWAIT;
         else
         {
             if (_page_cb_arr[_pg_index]._state != IN_USE)
@@ -718,6 +718,7 @@
             if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
                 throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
             _wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
+            _wrfc.incr_aio_cnt();
             _aio_evt_rem++;
             _cached_offset_dblks = 0;
 
@@ -735,10 +736,8 @@
 const iores
 wmgr::rotate_file()
 {
-    if (!_wrfc.rotate())
-        return RHM_IORES_FULL;
     _pg_cntr = 0;
-    return RHM_IORES_SUCCESS;
+    return _wrfc.rotate();
 }
 
 const u_int32_t
@@ -861,6 +860,7 @@
             // NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
             // Use stored pointer to nlfh in the pcb instead.
             pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
+            pcbp->_wfh->decr_aio_cnt();
 
             // Clean up this pcb's data_tok list
             pcbp->_pdtokl->clear();
@@ -951,7 +951,7 @@
         if (_page_cb_arr[_pg_index]._state == UNUSED)
             _page_cb_arr[_pg_index]._state = IN_USE;
         else if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
-            return RHM_IORES_AIO_WAIT;
+            return RHM_IORES_PAGE_AIOWAIT;
         else
         {
             std::ostringstream oss;

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -113,7 +113,9 @@
         const iores flush();
         const u_int32_t get_events(page_state state);
         const bool is_txn_synced(const std::string& xid);
-        inline const bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
+        inline const bool curr_pg_blocked() const
+                { return _page_cb_arr[_pg_index]._state != UNUSED; }
+        inline const bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
         
         // Debug aid
         const std::string status_str() const;

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -84,7 +84,7 @@
         _enq_cap_offs_dblks = _fsize_dblks;
 }
 
-bool
+const iores
 wrfc::rotate()
 {
     if (!_nfiles)
@@ -96,7 +96,11 @@
         _owi = !_owi;
     }
     _curr_fh = _fh_arr[_fh_index];
-    return reset(); //Checks if file is still in use (ie not fully dequeued yet)
+    if (_curr_fh->aio_cnt())
+        return RHM_IORES_FILE_AIOWAIT;
+    if (!reset()) //Checks if file is still in use (ie not fully dequeued yet)
+        return RHM_IORES_FULL;
+    return RHM_IORES_SUCCESS;
 }
 
 const bool

Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -39,6 +39,7 @@
 }
 }
 
+#include <jrnl/enums.hpp>
 #include <jrnl/rrfc.hpp>
 
 namespace rhm
@@ -80,7 +81,7 @@
         * \brief Rotate active file handle to next file in rotating file group.
         * \exception jerrno::JERR__NINIT if called before calling initialize().
         */
-        bool rotate();
+        const iores rotate();
 
         inline const u_int64_t rid() const { return _rid; }
         inline const u_int64_t get_incr_rid() { return _rid++; }
@@ -102,6 +103,10 @@
         inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
                 { return _curr_fh->add_wr_cmpl_cnt_dblks(a); }
 
+        inline const u_int16_t aio_cnt() const { return _curr_fh->aio_cnt(); }
+        inline const u_int16_t incr_aio_cnt() { return _curr_fh->incr_aio_cnt(); }
+        inline const u_int16_t decr_aio_cnt() { return _curr_fh->decr_aio_cnt(); }
+
         inline const bool empty() const { return _curr_fh->wr_empty(); }
         inline const u_int32_t remaining_dblks() const { return _curr_fh->wr_remaining_dblks(); }
         inline const bool is_full() const { return _curr_fh->is_wr_full(); };

Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -119,12 +119,13 @@
         jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
         jrnl_init(jc);
         unsigned n = num_msgs_to_full(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS * JRNL_SBLK_SIZE,
-                MSG_REC_SIZE_DBLKS);
+                MSG_REC_SIZE_DBLKS, true);
         for (unsigned m=0; m<3*2*n; m+=2) // overwrite files 3 times
         {
             enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
             deq_msg(jc, m);
         }
+        jc.stop(true);
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
     cout << "ok" << endl;

Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -35,7 +35,7 @@
 #define NUM_TEST_JFILES 4
 #define NUM_DEFAULT_JFILES  8
 #define JRNL_DEFAULT_FSIZE  24 // Multiples of JRNL_RMGR_PAGE_SIZE
-#define TEST_JFSIZE_SBLKS 96
+#define TEST_JFSIZE_SBLKS 128
 #define DEFAULT_JFSIZE_SBLKS (JRNL_DEFAULT_FSIZE * JRNL_RMGR_PAGE_SIZE)
 #define NUM_MSGS 5
 #define MSG_REC_SIZE_DBLKS 2
@@ -67,7 +67,7 @@
 handle_jcntl_response(const iores res, jcntl& jc, unsigned& aio_sleep_cnt, const std::string& ctxt,
         const iores exp_ret)
 {
-    if (res == RHM_IORES_AIO_WAIT)
+    if (res == RHM_IORES_PAGE_AIOWAIT)
     {
         if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
         {
@@ -102,9 +102,8 @@
     ctxt << "enq_msg(" << rid << ")";
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
-            dtp, transient), jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+    check_iores(ctxt.str(), jc.enqueue_data_record(msg.c_str(), msg.size(), msg.size(), dtp,
+            transient), exp_ret);
     return dtp->rid();
 }
 
@@ -116,9 +115,7 @@
     ctxt << "enq_extern_msg(" << rid << ")";
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.enqueue_extern_data_record(msg_size, dtp, transient), jc,
-            aio_sleep_cnt, ctxt.str(), exp_ret));
+    check_iores(ctxt.str(), jc.enqueue_extern_data_record(msg_size, dtp, transient), exp_ret);
     return dtp->rid();
 }
 
@@ -130,9 +127,8 @@
     ctxt << "enq_txn_msg(" << rid << ")";
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.enqueue_txn_data_record(msg.c_str(), msg.size(), msg.size(),
-            dtp, xid, transient), jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+    check_iores(ctxt.str(), jc.enqueue_txn_data_record(msg.c_str(), msg.size(), msg.size(), dtp,
+            xid, transient), exp_ret);
     return dtp->rid();
 }
 
@@ -144,9 +140,8 @@
     ctxt << "enq_extern_txn_msg(" << rid << ")";
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.enqueue_extern_txn_data_record(msg_size, dtp, xid, transient),
-            jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+    check_iores(ctxt.str(), jc.enqueue_extern_txn_data_record(msg_size, dtp, xid, transient),
+            exp_ret);
     return dtp->rid();
 }
 
@@ -159,9 +154,7 @@
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
     dtp->set_wstate(data_tok::ENQ);
     dtp->set_rid(drid);
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.dequeue_data_record(dtp), jc, aio_sleep_cnt, ctxt.str(),
-            exp_ret));
+    check_iores(ctxt.str(), jc.dequeue_data_record(dtp), exp_ret);
     return dtp->rid();
 }
 
@@ -175,9 +168,7 @@
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
     dtp->set_wstate(data_tok::ENQ);
     dtp->set_rid(drid);
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.dequeue_txn_data_record(dtp, xid), jc, aio_sleep_cnt,
-            ctxt.str(), exp_ret));
+    check_iores(ctxt.str(), jc.dequeue_txn_data_record(dtp, xid), exp_ret);
     return dtp->rid();
 }
 
@@ -186,9 +177,7 @@
 {
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.txn_abort(dtp, xid), jc, aio_sleep_cnt, "txn_abort", exp_ret));
+    check_iores("txn_abort", jc.txn_abort(dtp, xid), exp_ret);
     return dtp->rid();
 }
 
@@ -197,10 +186,7 @@
 {
     data_tok* dtp = new data_tok;
     BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-
-    unsigned aio_sleep_cnt = 0;
-    while (handle_jcntl_response(jc.txn_commit(dtp, xid), jc, aio_sleep_cnt, "txn_commit",
-            exp_ret));
+    check_iores("txn_commit", jc.txn_commit(dtp, xid), exp_ret);
     return dtp->rid();
 }
 
@@ -237,23 +223,26 @@
 
 const u_int32_t
 num_msgs_to_full(const u_int16_t num_files, const u_int32_t file_size_dblks,
-        const u_int32_t msg_rec_size_dblks)
+        const u_int32_t msg_rec_size_dblks, bool include_deq)
 {
-   return u_int32_t(::floor(1.0 * num_files * file_size_dblks / msg_rec_size_dblks));
+    u_int32_t rec_size_dblks = msg_rec_size_dblks;
+    if (include_deq)
+        rec_size_dblks++;
+    return u_int32_t(::floor(1.0 * num_files * file_size_dblks / rec_size_dblks));
 }
 
 const u_int32_t
 num_msgs_to_threshold(const u_int16_t num_files, const u_int32_t file_size_dblks,
         const u_int32_t msg_rec_size_dblks)
 {
-   return u_int32_t(::floor(1.0 * num_files * file_size_dblks * JRNL_ENQ_THRESHOLD /
+    return u_int32_t(::floor(1.0 * num_files * file_size_dblks * JRNL_ENQ_THRESHOLD /
            msg_rec_size_dblks / 100));
 }
 
 const u_int32_t
 num_dequeues_rem(const u_int16_t num_files, const u_int32_t file_size_sblks)
 {
-   return u_int32_t(::ceil(num_files * file_size_sblks * (1 - (1.0 * JRNL_ENQ_THRESHOLD / 100))));
+    return u_int32_t(::ceil(num_files * file_size_sblks * (1 - (1.0 * JRNL_ENQ_THRESHOLD / 100))));
 }
 
 const string&

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-03-17 21:11:19 UTC (rev 1771)
@@ -279,7 +279,7 @@
                             dptr = 0;
                             xptr = 0;
                             break;
-                        case rhm::journal::RHM_IORES_AIO_WAIT:
+                        case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
                             if (get_rd_events() == 0)
                             {
                                 rhm::journal::slock sl(&_rd_aio_mutex);

Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests	2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests	2008-03-17 21:11:19 UTC (rev 1771)
@@ -3,23 +3,23 @@
 . ../setup
 
 fail=0
-num_jrnls=3
+num_jrnls=1
 
 # Run jtt using default test set
 echo
-echo "***** Mode 1: New journal instance, no recover *****"
+echo "===== Mode 1: New journal instance, no recover ====="
 rm -rf /tmp/test_0*
 $pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls $num_jrnls || fail = 1
 echo
-echo "***** Mode 2: Re-use journal instance, no recover *****"
+echo "===== Mode 2: Re-use journal instance, no recover ====="
 rm -rf /tmp/test_0*
 $pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls $num_jrnls || fail = 1
 echo
-echo "***** Mode 3: New journal instance, recover previous test journal *****"
+echo "===== Mode 3: New journal instance, recover previous test journal ====="
 rm -rf /tmp/test_0*
 $pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls $num_jrnls || fail = 1
 echo
-echo "***** Mode 4: Re-use journal instance, recover previous test journal *****"
+echo "===== Mode 4: Re-use journal instance, recover previous test journal ====="
 rm -rf /tmp/test_0*
 $pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls $num_jrnls || fail = 1
 echo




More information about the rhmessaging-commits mailing list