[rhmessaging-commits] rhmessaging commits: r1824 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Apr 1 14:18:44 EDT 2008


Author: kpvdr
Date: 2008-04-01 14:18:44 -0400 (Tue, 01 Apr 2008)
New Revision: 1824

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/enums.hpp
   store/trunk/cpp/lib/jrnl/jcfg.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
Log:
Added logging to journal. Logging is sent to qpid logging when used from class JournalImpl. Also extended upper limit on journal file size from 64MiB to 2GiB. Max files per journal remains unchnaged at 64.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -130,22 +130,23 @@
         numJrnlFiles = JRNL_MIN_NUM_FILES;
         QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
     }
-    else if (numJrnlFiles > 64)
+    else if (numJrnlFiles > JRNL_MAX_NUM_FILES)
     {
-        numJrnlFiles = 64;
+        numJrnlFiles = JRNL_MAX_NUM_FILES;
         QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
     }
 
     u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
     u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+    u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
     if (jrnlFsizePgs < jrnlMinFsizePgs)
     {
         jrnlFsizePgs = jrnlMinFsizePgs;
         QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
     }
-    else if (jrnlFsizePgs > 1024) // (pgs) = 64MiB max file size
+    else if (jrnlFsizePgs > jrnlMaxFsizePgs)
     {
-        jrnlFsizePgs = 1024;
+        jrnlFsizePgs = jrnlMaxFsizePgs;
         QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
     }
 

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -28,6 +28,7 @@
 #include "jrnl/slock.hpp"
 #include "StoreException.h"
 #include <qpid/sys/Monitor.h>
+#include <qpid/log/Statement.h>
 
 using namespace rhm::bdbstore;
 using namespace rhm::journal;
@@ -65,10 +66,25 @@
     assert (journalTimerPtr != 0);
     journalTimerPtr->start();
     journalTimerPtr->add(inactivityFireEventPtr);
+
+    log(LOG_NOTICE, "Instantiation");
+    std::ostringstream oss1;
+    oss1 << "Journal directory = \"" << journalDirectory << "\"";
+    log(LOG_INFO, oss1.str());
+    std::ostringstream oss2;
+    oss2 << "Base file name = \"" << journalBaseFilename << "\"";
+    log(LOG_DEBUG, oss2.str());
+    std::ostringstream oss3;
+    oss3 << "Number of journal files = " << num_jfiles;
+    log(LOG_DEBUG, oss3.str());
+    std::ostringstream oss4;
+    oss4 << "Journal file size (sblks) = " << jfsize_sblks;
+    log(LOG_DEBUG, oss4.str());
 }
 
 JournalImpl::~JournalImpl()
 {
+    log(LOG_DEBUG, "Destroyed");
     if (_init_flag && !_stop_flag){
     	try { stop(true); }
         catch (const jexception& e) { std::cerr << e << std::endl; }
@@ -86,10 +102,21 @@
 }
 
 void
-JournalImpl::recover(const journal::rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+JournalImpl::initialize(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb)
+{
+    log(LOG_DEBUG, "Initialize");
+    jcntl::initialize(rd_cb, wr_cb);
+    log(LOG_DEBUG, "Initialization complete");
+}
+
+void
+JournalImpl::recover(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
         boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
         u_int64_t queue_id)
 {
+    std::ostringstream oss1;
+    oss1 << "Recover, queue_id = 0x" << std::hex << queue_id;
+    log(LOG_DEBUG, oss1.str());
     // Create list of prepared xids
     std::vector<std::string> prep_xid_list;
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
@@ -118,6 +145,9 @@
                 throw;
         }
     }
+    std::ostringstream oss2;
+    oss2 << "Recover complete; highest rid found = 0x" << std::hex << highest_rid;
+    log(LOG_DEBUG, oss2.str());
 }
 
 #define MAX_AIO_SLEEPS 500
@@ -249,6 +279,27 @@
 }
 
 void
+JournalImpl::log(journal::log_level ll, const std::string& log_stmt)
+{
+    log(ll, log_stmt.c_str());
+}
+
+void
+JournalImpl::log(journal::log_level ll, const char* const log_stmt)
+{
+    switch (ll)
+    {
+        case LOG_TRACE:  QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_DEBUG:  QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_INFO:  QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_NOTICE:  QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_WARN:  QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break;
+        case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break;
+    }
+}
+
+void
 JournalImpl::getEventsFire()
 {
     slock s(&_getf_mutex);

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-04-01 18:18:44 UTC (rev 1824)
@@ -95,6 +95,7 @@
 
             inline void initialize() { jcntl::initialize(0, &aio_wr_callback); } 
 
+            void initialize(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb);
             void recover(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id);
@@ -135,6 +136,10 @@
 
             void stop(bool block_till_aio_cmpl = false);
 
+            // Logging
+            void log(journal::log_level level, const std::string& log_stmt);
+            void log(journal::log_level level, const char* const log_stmt);
+
             // Overrides for get_events timer
             const journal::iores flush(const bool block_till_aio_cmpl = false);
 

Modified: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/jrnl/enums.hpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -74,6 +74,34 @@
         return "<iores unknown>";
     }
 
+    enum _log_level
+    {
+        LOG_TRACE = 0,
+        LOG_DEBUG,
+        LOG_INFO,
+        LOG_NOTICE,
+        LOG_WARN,
+        LOG_ERROR,
+        LOG_CRITICAL
+    };
+    typedef _log_level log_level;
+
+    static inline const char* log_level_str(log_level ll)
+    {
+        switch (ll)
+        {
+            case LOG_TRACE: return "TRACE";
+            case LOG_DEBUG: return "DEBUG";
+            case LOG_INFO: return "INFO";
+            case LOG_NOTICE: return "NOTICE";
+            case LOG_WARN: return "WARN";
+            case LOG_ERROR: return "ERROR";
+            case LOG_CRITICAL: return "CRITICAL";
+        }
+        return "<log level unknown>";
+    }
+
+
 } // namespace journal
 } // namespace rhm
 

Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -60,7 +60,9 @@
 #define JRNL_DBLK_SIZE          128         ///< Data block size in bytes (CANNOT BE LESS THAN 32!)
 #define JRNL_SBLK_SIZE          4           ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
 #define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. file_hdr)
+#define JRNL_MAX_FILE_SIZE      4194304     ///< Max. jrnl file size in sblks (excl. file_hdr)
 #define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
+#define JRNL_MAX_NUM_FILES      64          ///< Max. number of journal files
 #define JRNL_ENQ_THRESHOLD      80          ///< Percent full when enqueue connection will be closed
 
 // NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -153,8 +153,7 @@
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
 
-    // Debug info; should be sent to log file
-    // std::cout << _rcvdat.to_string(_jid) << std::endl << std::flush;
+    this->log(LOG_DEBUG, _rcvdat.to_string(_jid));
 
     // TODO - place this in a finalize() fn? - see ~jcntl() & initialize()...
     if (_datafh)
@@ -405,6 +404,21 @@
 }
 
 void
+jcntl::log(log_level ll, const std::string& log_stmt)
+{
+    log(ll, log_stmt.c_str());
+}
+
+void
+jcntl::log(log_level ll, const char* const log_stmt)
+{
+    if (ll > LOG_INFO)
+    {
+        std::cout << log_level_str(ll) << ": Journal \"" << _jid << "\": " << log_stmt << std::endl;
+    }
+}
+
+void
 jcntl::chk_wr_frot()
 {
     if (_wrfc.index() == _rrfc.index())
@@ -476,8 +490,10 @@
             _wmgr.get_events(pmgr::UNUSED);
             if (cnt++ > MAX_AIO_CMPL_SLEEPS)
             {
-                // TODO: Log this!
-                std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " << _wmgr.status_str() << std::endl;
+                std::ostringstream oss;
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
+                oss << _wmgr.status_str();
+                this->log(LOG_CRITICAL, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
             ::usleep(AIO_CMPL_SLEEP);
@@ -491,8 +507,10 @@
             _wmgr.get_events(pmgr::UNUSED);
             if (cnt++ > MAX_AIO_CMPL_SLEEPS)
             {
-                // TODO: Log this!
-                std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " << _wmgr.status_str() << std::endl;
+                std::ostringstream oss;
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
+                oss << _wmgr.status_str();
+                this->log(LOG_CRITICAL, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
             ::usleep(AIO_CMPL_SLEEP);
@@ -515,17 +533,21 @@
     // use the jinf data.
     if (_num_jfiles != ji.num_jfiles())
     {
+        std::ostringstream oss;
+        oss << "Recovery found " << ji.num_jfiles() <<
+                " files (different from --num-jfiles value of " << _num_jfiles << ").";
+        this->log(LOG_WARN, oss.str());
         _num_jfiles = ji.num_jfiles();
         _rcvdat._enq_cnt_list.resize(_num_jfiles);
-        std::cout << "WARNING: Recovery found " << _num_jfiles <<
-                " files (different from --num-jfiles parameter value)." << std::endl;
     }
     if (_jfsize_sblks != ji.jfsize_sblks())
     {
+        std::ostringstream oss;
+        oss << "Recovery found file size = " << (ji.jfsize_sblks() / JRNL_RMGR_PAGE_SIZE) <<
+                " (different from --jfile-size-pgs value of " <<
+                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) << ").";
+        this->log(LOG_WARN, oss.str());
         _jfsize_sblks = ji.jfsize_sblks();
-        std::cout << "WARNING: Recovery found file size = " <<
-                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
-                " (different from --jfile-size-pgs parameter value)." << std::endl;
     }
 
     try
@@ -831,11 +853,13 @@
     unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
     if (sblk_offs)
     {
-        // TODO: Connect the following with logger:
-        std::cout << std::hex  << "INFO: " << _jid << ": Bad record alignment found at fid=0x" <<
-                fid << " offs=0x" << file_pos << " (likely journal overwrite boundary); " <<
-                (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
-                " filler record(s) required." << std::endl;
+        {
+            std::ostringstream oss;
+            oss << std::hex << "Bad record alignment found at fid=0x" << fid;
+            oss << " offs=0x" << file_pos << " (likely journal overwrite boundary); " << std::dec;
+            oss << (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) << " filler record(s) required.";
+            this->log(LOG_WARN, oss.str());
+        }
         const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
         std::ostringstream oss;
         oss << _jdir.dirname() << "/" << _base_filename << ".";
@@ -857,14 +881,14 @@
         {
             ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
             assert(!ofsp.fail());
-            // TODO: Connect the following with logger:
-            std::cout << "INFO: * Wrote filler record at offs=0x" << file_pos << std::endl;
+            std::ostringstream oss;
+            oss << "Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+            this->log(LOG_NOTICE, oss.str());
             file_pos = ofsp.tellp();
         }
         ofsp.close();
         ::free(buff);
-        // TODO: Connect the following with logger:
-        std::cout << "INFO: Bad record alignment fixed." << std::endl;
+        this->log(LOG_INFO, "Bad record alignment fixed.");
     }
 }
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -586,6 +586,10 @@
 
         inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
 
+        // Logging
+        virtual void log(log_level level, const std::string& log_stmt);
+        virtual void log(log_level level, const char* const log_stmt);
+
         // these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:        
         void chk_wr_frot();
         inline const u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-04-01 18:00:57 UTC (rev 1823)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-04-01 18:18:44 UTC (rev 1824)
@@ -87,7 +87,7 @@
             std::string to_string(std::string& jid)
             {
                 std::ostringstream oss;
-                oss << "Jorunal file analysis (jid=\"" << jid << "\"):" << std::endl;
+                oss << "Recover file analysis (jid=\"" << jid << "\"):" << std::endl;
                 oss << "  Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
                 oss << "  First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
                 oss << "  Journal empty (_empty) = " << (_empty ? "TRUE" : "FALSE") << std::endl;




More information about the rhmessaging-commits mailing list