[rhmessaging-commits] rhmessaging commits: r1711 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Feb 20 10:58:08 EST 2008


Author: kpvdr
Date: 2008-02-20 10:58:08 -0500 (Wed, 20 Feb 2008)
New Revision: 1711

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
   store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
   store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Bugfix for intermittent JMS failures; also added scoped locks to emap and tmap classes. Some python tool bugfixes also

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -48,8 +48,8 @@
 static const u_int8_t MESSAGE_MESSAGE = 1;
 static const u_int8_t BASIC_MESSAGE = 2;
 bool BdbMessageStore::useAsync;
-qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(1000000); // 1ms
-qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(1000000000); // 1 sec
+qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms
+qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 unsigned int TxnCtxt::count = 0;
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
@@ -274,9 +274,8 @@
     {
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
     	jQueue->delete_jrnl_files();
-        queue.setExternalQueueStore(NULL); // will delete the journal if exists
+        queue.setExternalQueueStore(0); // will delete the journal if exists
     }
-    
 }
 
 void BdbMessageStore::create(const PersistableExchange& exchange)
@@ -906,7 +905,9 @@
 void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
 {
     if (!usingJrnl()) return;
+    if (queue.getExternalQueueStore() == 0) return;
     checkInit();
+    std::string qn = queue.getName();
     try {
         JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
         if (jc){
@@ -914,7 +915,7 @@
             /*rhm::journal::iores res =*/ jc->flush();
         }
     }catch (const journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
+        THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
     }
 }
 

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -25,6 +25,7 @@
 
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
+#include "jrnl/slock.hpp"
 #include "StoreException.h"
 #include <qpid/sys/Monitor.h>
 
@@ -56,6 +57,7 @@
                          _dtok(),
                          _external(false)
 {
+    ::pthread_mutex_init(&_getf_mutex, 0);
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
     journalTimer.start();
@@ -77,6 +79,7 @@
         ::free(_datap);
         _datap = 0;
     }
+    ::pthread_mutex_destroy(&_getf_mutex);
 }
 
 void
@@ -232,13 +235,12 @@
 }
 
 const iores
-JournalImpl::flush()
+JournalImpl::flush(const bool block_till_aio_cmpl)
 {
-    const iores res = jcntl::flush();
-    if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
-        getEventsFireEventsPtr->addRef();
-        journalTimer.add(getEventsFireEventsPtr);
-        getEventsTimerSetFlag = true;
+    const iores res = jcntl::flush(block_till_aio_cmpl);
+    {
+        slock s(&_getf_mutex);
+        if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
     }
     return res;
 }
@@ -246,22 +248,12 @@
 void
 JournalImpl::getEventsFire()
 {
-    if (_wmgr.get_aio_evt_rem()) {
-        try { jcntl::get_wr_events(); }
-        catch (const jexception& e)
-        {
-            // Another thread has already called get_wr_events() and it is still busy, ignore
-            if (e.err_code() != jerrno::JERR__PTHREAD) {
-                throw;
-            }
-        }
-    }
+    slock s(&_getf_mutex);
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) {
-        getEventsFireEventsPtr->addRef();
-        journalTimer.add(getEventsFireEventsPtr);
-        getEventsTimerSetFlag = true;
+        jcntl::get_wr_events();
     }
+    if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
 }
 
 void

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-02-20 15:58:08 UTC (rev 1711)
@@ -70,6 +70,7 @@
 
             bool getEventsTimerSetFlag;
             qpid::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
+            pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
 
             bool writeActivityFlag;
             bool flushTriggeredFlag;
@@ -135,13 +136,19 @@
             void stop(bool block_till_aio_cmpl = false);
 
             // Overrides for get_events timer
-            const journal::iores flush();
+            const journal::iores flush(const bool block_till_aio_cmpl = false);
 
             // TimerTask callback
             void getEventsFire();
             void flushFire();
 
         private:
+            inline void setGetEventTimer()
+            {
+                getEventsFireEventsPtr->addRef();
+                journalTimer.add(getEventsFireEventsPtr);
+                getEventsTimerSetFlag = true;
+            }
             void handleIoResult(const journal::iores r);
             static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
             // static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -33,6 +33,7 @@
 #include <iomanip>
 #include <sstream>
 #include <jrnl/jerrno.hpp>
+#include <jrnl/slock.hpp>
 
 
 namespace rhm
@@ -60,10 +61,12 @@
 void
 enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
 {
+    std::pair<emap_itr, bool> ret;
     fid_lock_pair rec(fid, locked);
-    pthread_mutex_lock(&_mutex);
-    std::pair<emap_itr, bool> ret = _map.insert(emap_param(rid, rec));
-    pthread_mutex_unlock(&_mutex);
+    {
+        slock s(&_mutex);
+        ret = _map.insert(emap_param(rid, rec));
+    }
     if (ret.second == false)
     {
         std::ostringstream oss;
@@ -75,9 +78,11 @@
 const u_int16_t
 enq_map::get_fid(const u_int64_t rid)
 {
-    pthread_mutex_lock(&_mutex);
-    emap_itr itr = _map.find(rid);
-    pthread_mutex_unlock(&_mutex);
+    emap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(rid);
+    }
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -96,34 +101,33 @@
 const u_int16_t
 enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag)
 {
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
     }
     if (itr->second.second && !txn_flag) // locked, but not a commit/abort
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "enq_map", "get_remove_fid");
     }
     u_int16_t fid = itr->second.first;
     _map.erase(itr);
-    pthread_mutex_unlock(&_mutex);
     return fid;
 }
 
 const bool
 enq_map::is_enqueued(const u_int64_t rid)
 {
-    pthread_mutex_lock(&_mutex);
-    emap_itr itr = _map.find(rid);
-    pthread_mutex_unlock(&_mutex);
+    emap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(rid);
+    }
     if (itr == _map.end()) // not found in map
         return false;
     if (itr->second.second) // locked
@@ -134,41 +138,39 @@
 void
 enq_map::lock(const u_int64_t rid)
 {
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
     }
     itr->second.second = true;
-    pthread_mutex_unlock(&_mutex);
 }
 
 void
 enq_map::unlock(const u_int64_t rid)
 {
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
     }
     itr->second.second = false;
-    pthread_mutex_unlock(&_mutex);
 }
 
 const bool
 enq_map::is_locked(const u_int64_t rid)
 {
-    pthread_mutex_lock(&_mutex);
-    emap_itr itr = _map.find(rid);
-    pthread_mutex_unlock(&_mutex);
+    emap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(rid);
+    }
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -182,20 +184,22 @@
 enq_map::rid_list(std::vector<u_int64_t>& rv)
 {
     rv.clear();
-    pthread_mutex_lock(&_mutex);
-    for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
-        rv.push_back(itr->first);
-    pthread_mutex_unlock(&_mutex);
+    {
+        slock s(&_mutex);
+        for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            rv.push_back(itr->first);
+    }
 }
 
 void
 enq_map::fid_list(std::vector<u_int16_t>& fv)
 {
     fv.clear();
-    pthread_mutex_lock(&_mutex);
-    for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
-        fv.push_back(itr->second.first);
-    pthread_mutex_unlock(&_mutex);
+    {
+        slock s(&_mutex);
+        for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            fv.push_back(itr->second.first);
+    }
 }
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -34,6 +34,7 @@
 #include <sstream>
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jexception.hpp>
+#include <jrnl/slock.hpp>
 
 namespace rhm
 {
@@ -64,7 +65,7 @@
 txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
 {
     bool ok = true;
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
@@ -76,16 +77,17 @@
     }
     else
         itr->second.push_back(td);
-    pthread_mutex_unlock(&_mutex);
     return ok;
 }
 
 const txn_data_list
 txn_map::get_tdata_list(const std::string& xid)
 {
-    pthread_mutex_lock(&_mutex);
-    xmap_itr itr = _map.find(xid);
-    pthread_mutex_unlock(&_mutex);
+    xmap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(xid);
+    }
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -98,11 +100,10 @@
 const txn_data_list
 txn_map::get_remove_tdata_list(const std::string& xid)
 {
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
@@ -110,16 +111,17 @@
     }
     txn_data_list list = itr->second;
     _map.erase(itr);
-    pthread_mutex_unlock(&_mutex);
     return list;
 }
 
 const bool
 txn_map::in_map(const std::string& xid)
 {
-    pthread_mutex_lock(&_mutex);
-    xmap_itr itr = _map.find(xid);
-    pthread_mutex_unlock(&_mutex);
+    xmap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(xid);
+    }
     if (itr == _map.end()) // not found in map
         return false;
     return true;
@@ -128,9 +130,11 @@
 const u_int32_t
 txn_map::get_rid_count(const std::string& xid)
 {
-    pthread_mutex_lock(&_mutex);
-    xmap_itr itr = _map.find(xid);
-    pthread_mutex_unlock(&_mutex);
+    xmap_itr itr;
+    {
+        slock s(&_mutex);
+        itr = _map.find(xid);
+    }
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -143,11 +147,10 @@
 const bool
 txn_map::is_txn_synced(const std::string& xid)
 {
-    pthread_mutex_lock(&_mutex);
+    slock s(&_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
-        pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
         oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
@@ -161,7 +164,6 @@
             break;
         }
     }
-    pthread_mutex_unlock(&_mutex);
     return is_synced;
 }
 
@@ -170,23 +172,24 @@
 {
     bool ok = true;
     bool found = false;
-    pthread_mutex_lock(&_mutex);
-    xmap_itr itr = _map.find(xid);
-    if (itr == _map.end()) // not found in map
-        ok = false;
-    else
     {
-        for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+        slock s(&_mutex);
+        xmap_itr itr = _map.find(xid);
+        if (itr == _map.end()) // not found in map
+            ok = false;
+        else
         {
-            if (litr->_rid == rid)
+            for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
             {
-                found = true;
-                litr->_aio_compl = true;
-                break;
+                if (litr->_rid == rid)
+                {
+                    found = true;
+                    litr->_aio_compl = true;
+                    break;
+                }
             }
         }
     }
-    pthread_mutex_unlock(&_mutex);
     if (ok && !found)
     {
         std::ostringstream oss;
@@ -200,10 +203,11 @@
 txn_map::xid_list(std::vector<std::string>& xv)
 {
     xv.clear();
-    pthread_mutex_lock(&_mutex);
-    for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
-        xv.push_back(itr->first);
-    pthread_mutex_unlock(&_mutex);
+    {
+        slock s(&_mutex);
+        for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+            xv.push_back(itr->first);
+    }
 }
 
 // static fn

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -37,6 +37,8 @@
 #include <jrnl/jcntl.hpp>
 #include <jrnl/jerrno.hpp>
 
+#include <iostream> // debug
+
 namespace rhm
 {
 namespace journal

Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-02-20 15:58:08 UTC (rev 1711)
@@ -210,7 +210,7 @@
         f.read(rem_in_blk(f, dblk_size))
 
     def check(self):
-        if self.empty() or self.magic[-1] not in ['a', 'c', 'd', 'e', 'f', 'x']:
+        if self.empty() or self.magic[:3] != 'RHM' or self.magic[-1] not in ['a', 'c', 'd', 'e', 'f', 'x']:
             return True
         if self.ver != hdr_ver and self.magic[-1] != 'x':
             raise Exception('%s: Invalid header version: found %d, expected %d.' % (self, self.ver, hdr_ver))

Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-02-20 15:58:08 UTC (rev 1711)
@@ -98,8 +98,13 @@
                     oss << path << "jfile_chk.py -q";
                     oss << " -d " << jpp->jdir();
                     oss << " -b " << jpp->base_filename();
-                    oss << " -c" << _args.test_case_csv_file_name;
-                    oss << " -t" << (*tci)->test_case_num();
+                    // TODO: When jfile_check.py can handle previously recovered journals for
+                    // specific tests, then remove this exclusion.
+                    if (!_args.recover_mode)
+                    {
+                        oss << " -c" << _args.test_case_csv_file_name;
+                        oss << " -t" << (*tci)->test_case_num();
+                    }
                     bool res = system(oss.str().c_str()) == 0;
                         (*tci)->set_fmt_chk_res(res, jpp->jid());
                 }

Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests	2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests	2008-02-20 15:58:08 UTC (rev 1711)
@@ -6,8 +6,38 @@
 
 # Run jtt using default test set
 echo
+echo "***** Mode 1: New journal instance, no recover, single journal *****"
+rm -rf /tmp/test_0*
 $pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk || fail = 1
 echo
+#echo "***** Mode 2: New journal instance, no recover, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 3: Reuse journal instance, no recover, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk || fail = 1
+#echo
+#echo "***** Mode 4: Reuse journal instance, no recover, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 5: New journal instance, recover previous test journal, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk || fail = 1
+#echo
+#echo "***** Mode 6: New journal instance, recover previous test journal, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 7: Reuse journal instance, recover previous test journal, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk || fail = 1
+#echo
+#echo "***** Mode 8: Reuse journal instance, recover previous test journal, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls 5 || fail = 1
+#echo
 
 # Run cpp-unit tests
 LD_PRELOAD=$pwd/.libs/libdlclose_noop.so DllPlugInTester -c -b $pwd/.libs/Journal*Tests.so || fail=1




More information about the rhmessaging-commits mailing list