[rhmessaging-commits] rhmessaging commits: r2875 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Nov 24 14:41:06 EST 2008


Author: kpvdr
Date: 2008-11-24 14:41:05 -0500 (Mon, 24 Nov 2008)
New Revision: 2875

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
Log:
Fix for BZ472215 "qpidd rmgr::get_events() threw JERR__AIO: AIO error". Also fix for txtest failures where journal is forced to run in extern mode.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -246,8 +246,14 @@
     {
         // Free any previous msg
         free_read_buffers();
+
+        // TODO: This is a brutal approach - very inefficient and slow. Rather intruduce a system of remembering
+        // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
+        // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
+        // combination of lid/offset.
         if (rid < lastReadRid)
             _rmgr.invalidate();
+
         _dlen = 0;
         _dtok.reset();
         _dtok.set_wstate(DataTokenImpl::ENQ);
@@ -262,9 +268,9 @@
             iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
             switch (res) {
                 case journal::RHM_IORES_SUCCESS:
-                    if (_dtok.rid() < rid) {
+                    if (_dtok.rid() != rid) {
                         free_read_buffers();
-                        // reset data token for next read
+                        // Reset data token for next read
                         _dlen = 0;
                         _dtok.reset();
                         _dtok.set_wstate(DataTokenImpl::ENQ);
@@ -283,19 +289,23 @@
                         std::stringstream ss;
                         ss << "read_data_record() returned " << journal::iores_str(res);
                         ss << "; exceeded maximum wait time";
-                        throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+                        throw jexception(journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
+                            "loadMsgContent");
                     }
                     break;
                 default:
                     std::stringstream ss;
                     ss << "read_data_record() returned " << journal::iores_str(res);
-                    throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+                    throw jexception(journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
+                        "loadMsgContent");
             }
         }
         if (!rid_found) {
             std::stringstream ss;
-            ss << "read_data_record() was unable to find rid " << rid << "; last rid found was " << _dtok.rid();
-            throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+            ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
+            ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
+            ss << " (" << _dtok.rid() << ")";
+            throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
         }
     }
     if (_external)

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -1257,8 +1257,7 @@
                 }
             }
         } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
-                                  ": loadContent() failed: " + e.what());
+            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
         }
         TxnCtxt txn;
         txn.begin(env, true);

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -51,6 +51,9 @@
 const u_int32_t jerrno::JERR__FILEIO            = 0x0104;
 const u_int32_t jerrno::JERR__RTCLOCK           = 0x0105;
 const u_int32_t jerrno::JERR__PTHREAD           = 0x0106;
+const u_int32_t jerrno::JERR__TIMEOUT           = 0x0107;
+const u_int32_t jerrno::JERR__UNEXPRESPONSE     = 0x0108;
+const u_int32_t jerrno::JERR__RECNFOUND         = 0x0109;
 
 // class jcntl
 const u_int32_t jerrno::JERR_JCNTL_STOPPED      = 0x0200;
@@ -138,6 +141,9 @@
     _err_map[JERR__FILEIO] = "JERR__FILEIO: File read or write failure.";
     _err_map[JERR__RTCLOCK] = "JERR__RTCLOCK: Reading real-time clock failed.";
     _err_map[JERR__PTHREAD] = "JERR__PTHREAD: pthread failure.";
+    _err_map[JERR__TIMEOUT] = "JERR__TIMEOUT: Timeout waiting for event.";
+    _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event.";
+    _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
 
     // class jcntl
     _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -69,6 +69,9 @@
         static const u_int32_t JERR__FILEIO;            ///< File read or write failure
         static const u_int32_t JERR__RTCLOCK;           ///< Reading real-time clock failed
         static const u_int32_t JERR__PTHREAD;           ///< pthread failure
+        static const u_int32_t JERR__TIMEOUT;           ///< Timeout waiting for an event
+        static const u_int32_t JERR__UNEXPRESPONSE;     ///< Unexpected response to call or event
+        static const u_int32_t JERR__RECNFOUND;         ///< Record not found
 
         // class jcntl
         static const u_int32_t JERR_JCNTL_STOPPED;      ///< Operation on stopped journal

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -199,8 +199,7 @@
                             oss << "rid=0x" << std::setw(16) << _hdr._rid;
                             oss << "; dtok_rid=" << std::setw(16) << dtokp->rid();
                             oss << "; dtok_id=0x" << std::setw(8) << dtokp->id();
-                            throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr",
-                                    "read");
+                            throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr", "read");
                         }
                     }
                     else
@@ -332,12 +331,38 @@
 rmgr::invalidate()
 {
     if (_rrfc.is_valid())
+        _rrfc.set_invalid();
+}
+
+#define MAX_AIO_SLEEPS 1000  // 10 sec 
+#define AIO_SLEEP_TIME 10000 // 10 ms
+void
+rmgr::init_validation()
+{
+    // Wait for any outstanding AIO read operations to complete before synchronizing
+    int aio_sleep_cnt = 0;
+    while (_aio_evt_rem)
     {
-        for (int i=0; i<_cache_num_pages; i++)
-            _page_cb_arr[i]._state = UNUSED;
-        _rrfc.unset_findex();
-    	_pg_offset_dblks = 0;
+        get_events();
+        if (_aio_evt_rem)
+        {
+            if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+            {
+                get_events();
+                usleep(AIO_SLEEP_TIME);
+            }
+            else
+                throw jexception(jerrno::JERR__TIMEOUT,
+                    "Invalidate timed out waiting for outstanding read aio to return", "rmgr", "invalidate");
+        }
     }
+
+    // Reset all read states and pointers
+    for (int i=0; i<_cache_num_pages; i++)
+        _page_cb_arr[i]._state = UNUSED;
+    _rrfc.unset_findex();
+    _pg_index = 0;
+    _pg_offset_dblks = 0;
 }
 
 void
@@ -525,6 +550,7 @@
 {
     int16_t first_uninit = -1;
     u_int16_t num_uninit = 0;
+    u_int16_t num_compl = 0;
     bool outstanding = false;
     // Index must start with current buffer and cycle around so that first
     // uninitialized buffer is initialized first
@@ -543,12 +569,17 @@
             case AIO_PENDING:
                 outstanding = true;
                 break;
+            case AIO_COMPLETE:
+                num_compl++;
+                break;
             default:;
         }
     }
     iores res = RHM_IORES_SUCCESS;
     if (num_uninit)
         res = init_aio_reads(first_uninit, num_uninit);
+    else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+        res = init_aio_reads(0, _cache_num_pages);
     if (outstanding)
         get_events();
     return res;
@@ -564,6 +595,7 @@
 
         if (!_rrfc.is_valid())
         {
+            init_validation();
             _jc->get_earliest_fid(); // calls _rrfc.set_findex()
             // If this file has not yet been written to, return RHM_IORES_EMPTY
             if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -584,6 +616,7 @@
         // space into all contiguous empty pages in one AIO operation.
 
         u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+        file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
         u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
         u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
         if (rd_size)
@@ -592,8 +625,7 @@
             // TODO: For perf, combine contiguous pages into single read
             //   1 or 2 AIOs needed depending on whether read block folds
             aio_cb* aiocbp = &_aio_cb_arr[pi];
-            aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi],
-                    rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+            aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
             if (aio::submit(_ioctx, 1, &aiocbp) < 0)
                 throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
             _rrfc.add_subm_cnt_dblks(rd_size);

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -92,6 +92,7 @@
     private:
         void initialize();
         void clean();
+        void init_validation();
         iores pre_read_check(data_tok* dtokp);
         iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
         void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-11-24 19:41:05 UTC (rev 2875)
@@ -79,10 +79,7 @@
     u_int16_t next_fc_index = _fc_index + 1;
     if (next_fc_index == _lfmp->num_jfiles())
         next_fc_index = 0;
-    fcntl* next_fc = _lfmp->get_fcntlp(next_fc_index);
-    _fc_index = next_fc_index;
-    _curr_fc = next_fc;
-    open_fh(_curr_fc->fname());
+    set_findex(next_fc_index);
     return RHM_IORES_SUCCESS;
 }
 




More information about the rhmessaging-commits mailing list