[rhmessaging-commits] rhmessaging commits: r1450 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Dec 10 14:34:57 EST 2007


Author: kpvdr
Date: 2007-12-10 14:34:57 -0500 (Mon, 10 Dec 2007)
New Revision: 1450

Added:
   store/trunk/cpp/lib/jrnl/slock.hpp
Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jexception.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
Log:
Fixed error in owi initialization on recover (which gave a 50% chance of a recover exception). Also replaced mutexes with s simple scoped lock class for write mutex. Added catch for condition RHM_IORES_FULL in dequeue.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -1026,8 +1026,8 @@
                     usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                     break;
                   case rhm::journal::RHM_IORES_FULL:
-                    std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-                    THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+                    std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+                    THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
                     break;
                   default:
                     assert( "Store Error: Unexpected msg state");
@@ -1167,6 +1167,10 @@
                 THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
             usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
             break;
+         case rhm::journal::RHM_IORES_FULL:
+            std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
+            THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
+            break;
           default:
             assert( "Store Error: Unexpected msg state");
         }

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/Makefile.am	2007-12-10 19:34:57 UTC (rev 1450)
@@ -76,6 +76,7 @@
   jrnl/rcvdat.hpp			\
   jrnl/rmgr.hpp			\
   jrnl/rrfc.hpp			\
+  jrnl/slock.hpp        \
   jrnl/txn_map.hpp			\
   jrnl/txn_rec.hpp			\
   jrnl/wmgr.hpp			\

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -139,14 +139,14 @@
     _rcvdat.reset();
     _emap.clear();
     _tmap.clear();
-//std::cout << "Starting journal analysis..." << std::endl;
+
     rcvr_janalyze(_rcvdat, prep_txn_list);
     highest_rid = _rcvdat._h_rid;
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
 
-// Debug info, but may be useful to print with a flag
-//_rcvdat.print(_jid);
+    // Debug info; may be useful to print with a flag
+    // _rcvdat.print(_jid);
 
     if (_datafh)
     {
@@ -177,7 +177,6 @@
     
     _readonly_flag = true;
     _init_flag = true;
-//std::cout << "Journal analysis complete." << std::endl;
 }
 
 void
@@ -191,7 +190,6 @@
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.recover_complete(_rcvdat._fro);
     _readonly_flag = false;
-//std::cout << "Journal revovery complete." << std::endl;
 }
 
 void 
@@ -206,32 +204,17 @@
 jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const bool transient)
 {
-    iores res;
     check_wstatus("enqueue_data_record");
-    pthread_mutex_lock(&_mutex);
-    try
-    {
-        res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
-                false);
-    }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient, false);
 }
 
 const iores
 jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
 {
-    iores res;
     check_wstatus("enqueue_extern_data_record");
-    pthread_mutex_lock(&_mutex);
-    try
-    {
-        res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
-    }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
 }
 
 const iores
@@ -239,33 +222,19 @@
         const size_t this_data_len, data_tok* dtokp, const std::string& xid,
         const bool transient)
 {
-    iores res;
     check_wstatus("enqueue_tx_data_record");
-    pthread_mutex_lock(&_mutex);
-    try
-    {
-        res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
-                transient, false);
-    }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
+            transient, false);
 }
 
 const iores
 jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
         const std::string& xid, const bool transient)
 {
-    iores res;
     check_wstatus("enqueue_extern_txn_data_record");
-    pthread_mutex_lock(&_mutex);
-    try
-    {
-        res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
-    }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
 }
 
 const iores
@@ -294,49 +263,33 @@
 const iores
 jcntl::dequeue_data_record(data_tok* const dtokp)
 {
-    iores res;
     check_wstatus("dequeue_data");
-    pthread_mutex_lock(&_mutex);
-    try { res = _wmgr.dequeue(dtokp, NULL, 0); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.dequeue(dtokp, NULL, 0);
 }
 
 const iores
 jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
 {
-    iores res;
     check_wstatus("dequeue_data");
-    pthread_mutex_lock(&_mutex);
-    try { res = _wmgr.dequeue(dtokp, xid.data(), xid.size()); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.dequeue(dtokp, xid.data(), xid.size());
 }
 
 const iores
 jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
-    iores res;
     check_wstatus("txn_abort");
-    pthread_mutex_lock(&_mutex);
-    try { res = _wmgr.abort(dtokp, xid.data(), xid.size()); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.abort(dtokp, xid.data(), xid.size());
 }
 
 const iores
 jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
 {
-    iores res;
     check_wstatus("txn_commit");
-    pthread_mutex_lock(&_mutex);
-    try { res = _wmgr.commit(dtokp, xid.data(), xid.size()); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    slock s(&_mutex);
+    return _wmgr.commit(dtokp, xid.data(), xid.size());
 }
 
 const bool
@@ -348,22 +301,10 @@
 const u_int32_t
 jcntl::get_wr_events()
 {
-    u_int32_t res;
-    int ret = pthread_mutex_trylock(&_mutex);
-    if (ret)
-    {
-        if (ret != EBUSY)
-        {
-            std::stringstream ss;
-            ss << "pthread_mutex_trylock() returned " << errno << " (" << strerror(errno) << ")";
-            throw jexception(jerrno::JERR__PTHREAD, ss.str().c_str(), "jcntl", "get_wr_events");
-        }
-        return 0; // already locked, return immediately
-    }
-    try { res = _wmgr.get_events(pmgr::UNUSED); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
-    return res;
+    stlock t(&_mutex);
+    if (t.locked())
+        return _wmgr.get_events(pmgr::UNUSED);
+    return 0;
 }
 
 const u_int32_t
@@ -395,10 +336,8 @@
         return;
     if (_readonly_flag)
         throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
-    pthread_mutex_lock(&_mutex);
-    try { _wmgr.flush(); }
-    catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
-    pthread_mutex_unlock(&_mutex);
+    slock s(&_mutex);
+    _wmgr.flush();
 }
 
 // Private functions
@@ -491,7 +430,8 @@
     {
         u_int16_t fid = rd._ffid;
         std::ifstream ifs;
-        while (rcvr_get_next_record(fid, &ifs, rd));
+        bool lowi = rd._owi; // local copy of owi to be used during analysis
+        while (rcvr_get_next_record(fid, &ifs, lowi, rd));
         
         // Check for journal full condition
         u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
@@ -512,12 +452,12 @@
 }
 
 const bool
-jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd)
 {
     size_t cum_size_read = 0;
     void* xidp = NULL;
     hdr h;
-    if (!jfile_cycle(fid, ifsp, rd, true))
+    if (!jfile_cycle(fid, ifsp, lowi, rd, true))
         return false;
     std::streampos read_pos = ifsp->tellg();
     ifsp->read((char*)&h, sizeof(hdr));
@@ -526,7 +466,7 @@
         case RHM_JDAT_ENQ_MAGIC:
             {
                 enq_rec er;
-                if (!decode(er, fid, ifsp, cum_size_read, h, rd, read_pos))
+                if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
                     return false;
                 if (!er.is_transient()) // Ignore transient msgs
                 {
@@ -547,7 +487,7 @@
         case RHM_JDAT_DEQ_MAGIC:
             {
                 deq_rec dr;
-                if (!decode(dr, fid, ifsp, cum_size_read, h, rd, read_pos))
+                if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
                     return false;
                 if (dr.xid_size())
                 {
@@ -580,7 +520,7 @@
         case RHM_JDAT_TXA_MAGIC:
             {
                 txn_rec ar;
-                if (!decode(ar, fid, ifsp, cum_size_read, h, rd, read_pos))
+                if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
                     return false;
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
@@ -607,7 +547,7 @@
         case RHM_JDAT_TXC_MAGIC:
             {
                 txn_rec cr;
-                if (!decode(cr, fid, ifsp, cum_size_read, h, rd, read_pos))
+                if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
                     return false;
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
@@ -656,9 +596,9 @@
 
 const bool
 jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, hdr& h,
-        rcvdat& rd, std::streampos& rec_offset)
+        bool& lowi, rcvdat& rd, std::streampos& rec_offset)
 {
-    if (!check_owi(fid, h, rd, rec_offset))
+    if (!check_owi(fid, h, lowi, rd, rec_offset))
         return false;
     bool done = false;
     while (!done)
@@ -673,14 +613,14 @@
             rd._eo = rec_offset;
             return false;
         }
-        if (!jfile_cycle(fid, ifsp, rd, false))
+        if (!jfile_cycle(fid, ifsp, lowi, rd, false))
             return false;
     }
     return true;
 }
 
 const bool
-jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd, const bool jump_fro)
+jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd, const bool jump_fro)
 {
     if (ifsp->is_open())
     {
@@ -692,7 +632,7 @@
             if (fid >= _num_jfiles)
             {
                 fid = 0;
-                rd._owi = !rd._owi; // Flip owi
+                lowi = !lowi; // Flip local owi
             }
             if (fid == rd._ffid) // used up all journal files
                 return false;
@@ -728,9 +668,9 @@
 }
         
 const bool
-jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
+jcntl::check_owi(const u_int16_t fid, hdr& h, bool& lowi, rcvdat& rd, std::streampos& read_pos)
 {
-    if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
+    if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
     {
         u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
         if (fid == expected_fid)

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -46,9 +46,9 @@
 #include <jrnl/lfh.hpp>
 #include <jrnl/rcvdat.hpp>
 #include <jrnl/rmgr.hpp>
+#include <jrnl/slock.hpp>
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
-#include <pthread.h>
 #include <qpid/broker/PersistableQueue.h>
 
 namespace rhm
@@ -548,12 +548,16 @@
         void flush();
 
         inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+
         inline const u_int32_t get_wr_outstanding_aio_dblks() const
                 { return _wrfc.aio_outstanding_dblks(); }
+
         inline const u_int32_t get_wr_outstanding_aio_dblks(u_int16_t pi) const
                 { return _wrfc.file_handle(pi)->wr_aio_outstanding_dblks(); }
+
         inline const u_int32_t get_rd_outstanding_aio_dblks() const
                 { return _rrfc.aio_outstanding_dblks(); }
+
         inline const u_int32_t get_rd_outstanding_aio_dblks(u_int16_t pi) const
                 { return _rrfc.file_handle(pi)->rd_aio_outstanding_dblks(); }
 
@@ -639,15 +643,17 @@
         */
         void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
 
-        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd);
+        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi,
+                rcvdat& rd);
 
         const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
-                hdr& h, rcvdat& rd, std::streampos& rec_offset);
+                hdr& h, bool& lowi, rcvdat& rd, std::streampos& rec_offset);
 
-        const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+        const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd,
                 const bool jump_fro);
         
-        const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos);
+        const bool check_owi(const u_int16_t fid, hdr& h, bool& lowi, rcvdat& rd,
+                std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
 

Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -49,13 +49,20 @@
 
 // Macro for formatting commom system errors
 #define FORMAT_SYSERR(errno) " errno=" << errno << " (" << strerror(errno) << ")"
-#define MALLOC_CHK(ptr, var, cls, fn) if (ptr == NULL) { \
+
+#define MALLOC_CHK(ptr, var, cls, fn) if(ptr == NULL) { \
     clean(); \
     std::stringstream ss; \
     ss << var << ": malloc() failed: " << FORMAT_SYSERR(errno); \
     throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), cls, fn); \
     }
 
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+    std::stringstream ss; \
+    ss << pfn << " failed: " << FORMAT_SYSERR(err); \
+    throw jexception(jerrno::JERR__PTHREAD, ss.str().c_str(), cls, fn); \
+    }
+    
 #define MAX_MSG_SIZE 1024
 #define MAX_THROWING_SIZE 128
 

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -83,6 +83,8 @@
             void print(std::string& jid)
             {
                 std::cout << "Jorunal file analysis (jid=\"" << jid << "\"):" << std::endl;
+                std::cout << "  Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") <<
+                        std::endl;
                 std::cout << "  Journal empty (_empty) = " << (_empty ? "TRUE" : "FALSE") <<
                         std::endl;
                 std::cout << "  First fid (_ffid) = " << _ffid << std::endl;

Added: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/slock.hpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -0,0 +1,86 @@
+/**
+* \file slock.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Messaging journal top-level control and interface class
+* rhm::journal::slock. See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_slock_hpp
+#define rhm_journal_slock_hpp
+
+#include <pthread.h>
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
+
+namespace rhm
+{
+namespace journal
+{
+
+    // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+    class slock
+    {
+    protected:
+        pthread_mutex_t* _m;
+    public:
+        inline slock(pthread_mutex_t* m) : _m(m)
+        {
+            PTHREAD_CHK(pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
+        }
+        inline ~slock()
+        {
+            PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
+        }
+    };
+
+    // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+    class stlock
+    {
+    protected:
+        pthread_mutex_t* _m;
+        bool _locked;
+    public:
+        inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
+        {
+            int ret = pthread_mutex_trylock(_m);
+            _locked = (ret == 0); // check if lock obtained
+            if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "pthread_mutex_trylock", "stlock",
+                    "stlock");
+        }
+        inline ~stlock()
+        {
+            if (_locked)
+                PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock", "~stlock");
+        }
+        inline const bool locked() const { return _locked; }
+    };
+
+} // namespace journal
+} // namespace rhm
+
+#endif // ifndef rhm_journal_slock_hpp

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -118,13 +118,13 @@
     if (this_data_len != tot_data_len && !external)
         return RHM_IORES_NOTIMPL;
 
-    if (_deq_busy || _abort_busy || _commit_busy)
-        return RHM_IORES_BUSY;
-
     iores res = pre_write_check(WMGR_ENQUEUE, dtokp);
     if (res != RHM_IORES_SUCCESS)
         return res;
 
+    if (_deq_busy || _abort_busy || _commit_busy)
+        return RHM_IORES_BUSY;
+
     bool cont = false;
     if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
     {
@@ -254,13 +254,13 @@
     if (xid_len)
         assert(xid_ptr != NULL);
 
-    if (_enq_busy || _abort_busy || _commit_busy)
-        return RHM_IORES_BUSY;
-
     iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
     if (res != RHM_IORES_SUCCESS)
         return res;
 
+    if (_enq_busy || _abort_busy || _commit_busy)
+        return RHM_IORES_BUSY;
+
     bool cont = false;
     if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
     {

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-10 19:34:57 UTC (rev 1450)
@@ -61,6 +61,7 @@
         rrfc::initialize(nfiles, fh_arr, rdp->_lfid);
         _rid = rdp->_h_rid + 1;
         _reset_ok = true;
+        _owi = rdp->_owi;
     }
     else
     {




More information about the rhmessaging-commits mailing list