[rhmessaging-commits] rhmessaging commits: r979 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Oct 9 12:01:15 EDT 2007


Author: kpvdr
Date: 2007-10-09 12:01:15 -0400 (Tue, 09 Oct 2007)
New Revision: 979

Modified:
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/enq_map.hpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/janalyze.py
Log:
Bugfix in rmgr::consume_xid_rec(); Added boolean transaction lock to enq-map.

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/Makefile.am	2007-10-09 16:01:15 UTC (rev 979)
@@ -1,4 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) -pthread
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) -DRHM_CLEAN -pthread
  
 lib_LTLIBRARIES = libbdbstore.la
 

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -55,9 +55,16 @@
 void
 enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception)
 {
+    insert_fid(rid, fid, false);
+}
+
+void
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception)
+{
+    std::pair<u_int16_t, bool> rec(fid, locked);
     pthread_mutex_lock(&_mutex);
-    std::pair<std::map<u_int64_t, u_int16_t>::iterator, bool> ret =
-            _map.insert(std::pair<u_int64_t, u_int16_t>(rid, fid));
+    std::pair<std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator, bool> ret =
+            _map.insert(std::pair<u_int64_t, std::pair<u_int16_t, bool> >(rid, rec));
     pthread_mutex_unlock(&_mutex);
     if (ret.second == false)
     {
@@ -72,43 +79,102 @@
 enq_map::get_fid(const u_int64_t rid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
-    std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
+    std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.find(rid);
     pthread_mutex_unlock(&_mutex);
-    if (itr == _map.end())
+    if (itr == _map.end()) // not found in map
     {
         std::stringstream ss;
-        ss << std::hex << std::setfill('0');
-        ss << "rid=0x" << std::setw(16) << rid;
+        ss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_fid");
     }
-    return itr->second;
+    if (itr->second.second) // locked
+    {
+        std::stringstream ss;
+        ss << std::hex << "rid=0x" << rid;
+        throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map", "get_fid");
+    }
+    return itr->second.first;
 }
 
 const u_int16_t
 enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
-    std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
-    if (itr == _map.end())
+    std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
     {
         pthread_mutex_unlock(&_mutex);
         std::stringstream ss;
-        ss << std::hex << std::setfill('0');
-        ss << "rid=0x" << std::setw(16) << rid;
+        ss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
     }
-    u_int16_t fid = itr->second;
+    if (itr->second.second) // locked
+    {
+        pthread_mutex_unlock(&_mutex);
+        std::stringstream ss;
+        ss << std::hex << "rid=0x" << rid;
+        throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map", "get_remove_fid");
+    }
+    u_int16_t fid = itr->second.first;
     _map.erase(itr);
     pthread_mutex_unlock(&_mutex);
     return fid;
 }
 
 void
+enq_map::lock(const u_int64_t rid)
+{
+    pthread_mutex_lock(&_mutex);
+    std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+    {
+        pthread_mutex_unlock(&_mutex);
+        std::stringstream ss;
+        ss << std::hex << "rid=0x" << rid;
+        throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
+    }
+    itr->second.second = true;
+    pthread_mutex_unlock(&_mutex);
+}
+
+void
+enq_map::unlock(const u_int64_t rid)
+{
+    pthread_mutex_lock(&_mutex);
+    std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.find(rid);
+    if (itr == _map.end()) // not found in map
+    {
+        pthread_mutex_unlock(&_mutex);
+        std::stringstream ss;
+        ss << std::hex << "rid=0x" << rid;
+        throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
+    }
+    itr->second.second = false;
+    pthread_mutex_unlock(&_mutex);
+}
+
+const bool
+enq_map::is_locked(const u_int64_t rid)
+{
+    pthread_mutex_lock(&_mutex);
+    std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.find(rid);
+    pthread_mutex_unlock(&_mutex);
+    if (itr == _map.end()) // not found in map
+    {
+        std::stringstream ss;
+        ss << std::hex << "rid=0x" << rid;
+        throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_fid");
+    }
+    return itr->second.second;
+}
+
+void
 enq_map::rid_list(std::vector<u_int64_t>& rv)
 {
     rv.clear();
     pthread_mutex_lock(&_mutex);
-    for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr != _map.end(); itr++)
+    for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.begin();
+            itr != _map.end(); itr++)
         rv.push_back(itr->first);
     pthread_mutex_unlock(&_mutex);
 }
@@ -118,8 +184,9 @@
 {
     fv.clear();
     pthread_mutex_lock(&_mutex);
-    for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr != _map.end(); itr++)
-        fv.push_back(itr->second);
+    for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr = _map.begin();
+            itr != _map.end(); itr++)
+        fv.push_back(itr->second.first);
     pthread_mutex_unlock(&_mutex);
 }
 

Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -53,13 +53,13 @@
 
     /**
     * \class enq_map
-    * \brief Class for storing the file id (fid) for each enqueued data block using
-    *     the record id (rid) as a key.
+    * \brief Class for storing the file id (fid) and a transaction locked flag for each enqueued
+    *     data block using the record id (rid) as a key.
     */
     class enq_map
     {
     private:
-        std::map<u_int64_t, u_int16_t> _map;
+        std::map<u_int64_t, std::pair<u_int16_t, bool> > _map;
         pthread_mutex_t _mutex;
 
     public:
@@ -67,8 +67,12 @@
         ~enq_map();
 
         void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
+        void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception);
         const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
         const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
+        void lock(const u_int64_t rid);
+        void unlock(const u_int64_t rid);
+        const bool is_locked(const u_int64_t rid);
         inline void clear() { _map.clear(); }
         inline const bool empty() const { return _map.empty(); }
         inline const u_int16_t size() const { return (u_int16_t)_map.size(); }

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -108,6 +108,7 @@
 // class enq_map
 const u_int32_t jerrno::JERR_EMAP_DUPLICATE     = 0x0b00;
 const u_int32_t jerrno::JERR_EMAP_NOTFOUND      = 0x0b01;
+const u_int32_t jerrno::JERR_EMAP_LOCKED        = 0x0b02;
 
 // class jinf
 const u_int32_t jerrno::JERR_JINF_CVALIDFAIL    = 0x0c00;
@@ -206,6 +207,8 @@
             "Attempted to insert enqueue record using duplicate key.");
     _err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: "
             "Key not found in enqueue map.");
+    _err_map[JERR_EMAP_LOCKED] = std::string("JERR_EMAP_LOCKED: "
+            "Record ID locked by a pending transaction.");
 
     // class jinf
     _err_map[JERR_JINF_CVALIDFAIL] = std::string("JERR_JINF_CVALIDFAIL: "

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -125,6 +125,7 @@
         // class enq_map
         static const u_int32_t JERR_EMAP_DUPLICATE;     ///< Attempted to insert using duplicate key
         static const u_int32_t JERR_EMAP_NOTFOUND;      ///< Key not found in map
+        static const u_int32_t JERR_EMAP_LOCKED;        ///< rid locked by pending txn
 
         // class jinf
         static const u_int32_t JERR_JINF_CVALIDFAIL;    ///< Compatibility validation failure

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -248,6 +248,7 @@
     // Read header, determine next record type
     while (true)
     {
+//std::string s;
 //std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
         if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
         {
@@ -292,7 +293,7 @@
 #endif
                 if (is_enq) // ok, this record is enqueued, check it, then read it...
                 {
-//std::cout << " $" << std::flush;
+//std::cout << "e" << std::flush;
                     if (dtokp->rid())
                     {
                         if (_hdr._rid != dtokp->rid())
@@ -329,7 +330,7 @@
                     return res;
                 }
                 else // skip this record, it is already dequeued
-//{ std::cout << " %" << std::flush;
+//{ std::cout << "d" << std::flush;
                     consume_xid_rec(_hdr, rptr, dtokp);
 //}
                 break;
@@ -510,7 +511,7 @@
     {
         enq_hdr ehdr;
         ::memcpy(&ehdr, rptr, sizeof(enq_hdr));
-        dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+        dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
     }
     else if (h._magic == RHM_JDAT_DEQ_MAGIC)
     {
@@ -579,8 +580,6 @@
             if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
             {
                 dtokp->set_rstate(data_tok::SKIP_PART);
-                // Use data_tok::dblocks_proc field to save how many skip bloks still to go...
-//                dtokp->set_dblocks_read(dsize_dblks - tot_dblk_cnt);
 //std::cout << " w]" << std::flush;
                 return RHM_IORES_AIO_WAIT;
             }

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-09 16:01:15 UTC (rev 979)
@@ -51,7 +51,7 @@
     CPPUNIT_TEST(RecoverReadTest);
     CPPUNIT_TEST(RecoveredReadTest);
     CPPUNIT_TEST(RecoveredDequeueTest);
-//    CPPUNIT_TEST(ComplexRecoveryTest1);
+    CPPUNIT_TEST(ComplexRecoveryTest1);
     CPPUNIT_TEST(EncodeTest_000);
     CPPUNIT_TEST(EncodeTest_001);
     CPPUNIT_TEST(EncodeTest_002);

Modified: store/trunk/cpp/tests/jrnl/janalyze.py
===================================================================
--- store/trunk/cpp/tests/jrnl/janalyze.py	2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/tests/jrnl/janalyze.py	2007-10-09 16:01:15 UTC (rev 979)
@@ -25,6 +25,7 @@
 from struct import unpack, calcsize
 from time import gmtime, strftime
 
+base_file_name = 'test'
 dblk_size = 128
 sblk_size = 4 * dblk_size
 file_size = (3072 + 1) * sblk_size
@@ -359,8 +360,6 @@
         return ''
 
     def __str__(self):
-#        if self.enq_tail == None:
-#            return '%s %s [no tail]' % (Hdr.__str__(self), self.dsize, dstr)
         return '%s %s%s %s %s' % (Hdr.__str__(self), print_xid(self.xidsize, self.xid), print_data(self.dsize, self.data), self.enq_tail, self.print_flags())
 
 
@@ -510,7 +509,7 @@
                 self.last_file = self.file_num == self.file_start - 1
         if self.file_num < 0 or self.file_num >= num_files:
             raise Exception('Bad file number %d' % self.file_num)
-        file_name = 'jdata/test.%04d.jdat' % self.file_num
+        file_name = 'jdata/' + base_file_name + '.%04d.jdat' % self.file_num
         self.f = open(file_name)
         self.fhdr = load(self.f, Hdr)
         if seek_flag and self.f.tell() != self.fro:
@@ -558,7 +557,7 @@
         tss = ''
         print 'Analyzing journal files:'
         for i in range(0, num_files):
-            file_name = 'jdata/test.%04d.jdat' % i
+            file_name = 'jdata/' + base_file_name + '.%04d.jdat' % i
             f = open(file_name)
             fhdr = load(f, Hdr)
             if fhdr.empty():




More information about the rhmessaging-commits mailing list