[rhmessaging-commits] rhmessaging commits: r1450 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Dec 10 14:34:57 EST 2007
Author: kpvdr
Date: 2007-12-10 14:34:57 -0500 (Mon, 10 Dec 2007)
New Revision: 1450
Added:
store/trunk/cpp/lib/jrnl/slock.hpp
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jexception.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
Log:
Fixed error in owi initialization on recover (which gave a 50% chance of a recover exception). Also replaced mutexes with s simple scoped lock class for write mutex. Added catch for condition RHM_IORES_FULL in dequeue.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -1026,8 +1026,8 @@
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
break;
case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+ std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
break;
default:
assert( "Store Error: Unexpected msg state");
@@ -1167,6 +1167,10 @@
THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
break;
+ case rhm::journal::RHM_IORES_FULL:
+ std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
+ break;
default:
assert( "Store Error: Unexpected msg state");
}
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/Makefile.am 2007-12-10 19:34:57 UTC (rev 1450)
@@ -76,6 +76,7 @@
jrnl/rcvdat.hpp \
jrnl/rmgr.hpp \
jrnl/rrfc.hpp \
+ jrnl/slock.hpp \
jrnl/txn_map.hpp \
jrnl/txn_rec.hpp \
jrnl/wmgr.hpp \
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -139,14 +139,14 @@
_rcvdat.reset();
_emap.clear();
_tmap.clear();
-//std::cout << "Starting journal analysis..." << std::endl;
+
rcvr_janalyze(_rcvdat, prep_txn_list);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
-// Debug info, but may be useful to print with a flag
-//_rcvdat.print(_jid);
+ // Debug info; may be useful to print with a flag
+ // _rcvdat.print(_jid);
if (_datafh)
{
@@ -177,7 +177,6 @@
_readonly_flag = true;
_init_flag = true;
-//std::cout << "Journal analysis complete." << std::endl;
}
void
@@ -191,7 +190,6 @@
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
-//std::cout << "Journal revovery complete." << std::endl;
}
void
@@ -206,32 +204,17 @@
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
- iores res;
check_wstatus("enqueue_data_record");
- pthread_mutex_lock(&_mutex);
- try
- {
- res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
- false);
- }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient, false);
}
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
{
- iores res;
check_wstatus("enqueue_extern_data_record");
- pthread_mutex_lock(&_mutex);
- try
- {
- res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
- }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
}
const iores
@@ -239,33 +222,19 @@
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
const bool transient)
{
- iores res;
check_wstatus("enqueue_tx_data_record");
- pthread_mutex_lock(&_mutex);
- try
- {
- res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
- }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
+ transient, false);
}
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
- iores res;
check_wstatus("enqueue_extern_txn_data_record");
- pthread_mutex_lock(&_mutex);
- try
- {
- res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
- }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
}
const iores
@@ -294,49 +263,33 @@
const iores
jcntl::dequeue_data_record(data_tok* const dtokp)
{
- iores res;
check_wstatus("dequeue_data");
- pthread_mutex_lock(&_mutex);
- try { res = _wmgr.dequeue(dtokp, NULL, 0); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.dequeue(dtokp, NULL, 0);
}
const iores
jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
- iores res;
check_wstatus("dequeue_data");
- pthread_mutex_lock(&_mutex);
- try { res = _wmgr.dequeue(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.dequeue(dtokp, xid.data(), xid.size());
}
const iores
jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
- iores res;
check_wstatus("txn_abort");
- pthread_mutex_lock(&_mutex);
- try { res = _wmgr.abort(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.abort(dtokp, xid.data(), xid.size());
}
const iores
jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
- iores res;
check_wstatus("txn_commit");
- pthread_mutex_lock(&_mutex);
- try { res = _wmgr.commit(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ slock s(&_mutex);
+ return _wmgr.commit(dtokp, xid.data(), xid.size());
}
const bool
@@ -348,22 +301,10 @@
const u_int32_t
jcntl::get_wr_events()
{
- u_int32_t res;
- int ret = pthread_mutex_trylock(&_mutex);
- if (ret)
- {
- if (ret != EBUSY)
- {
- std::stringstream ss;
- ss << "pthread_mutex_trylock() returned " << errno << " (" << strerror(errno) << ")";
- throw jexception(jerrno::JERR__PTHREAD, ss.str().c_str(), "jcntl", "get_wr_events");
- }
- return 0; // already locked, return immediately
- }
- try { res = _wmgr.get_events(pmgr::UNUSED); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
- return res;
+ stlock t(&_mutex);
+ if (t.locked())
+ return _wmgr.get_events(pmgr::UNUSED);
+ return 0;
}
const u_int32_t
@@ -395,10 +336,8 @@
return;
if (_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
- pthread_mutex_lock(&_mutex);
- try { _wmgr.flush(); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
- pthread_mutex_unlock(&_mutex);
+ slock s(&_mutex);
+ _wmgr.flush();
}
// Private functions
@@ -491,7 +430,8 @@
{
u_int16_t fid = rd._ffid;
std::ifstream ifs;
- while (rcvr_get_next_record(fid, &ifs, rd));
+ bool lowi = rd._owi; // local copy of owi to be used during analysis
+ while (rcvr_get_next_record(fid, &ifs, lowi, rd));
// Check for journal full condition
u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
@@ -512,12 +452,12 @@
}
const bool
-jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd)
{
size_t cum_size_read = 0;
void* xidp = NULL;
hdr h;
- if (!jfile_cycle(fid, ifsp, rd, true))
+ if (!jfile_cycle(fid, ifsp, lowi, rd, true))
return false;
std::streampos read_pos = ifsp->tellg();
ifsp->read((char*)&h, sizeof(hdr));
@@ -526,7 +466,7 @@
case RHM_JDAT_ENQ_MAGIC:
{
enq_rec er;
- if (!decode(er, fid, ifsp, cum_size_read, h, rd, read_pos))
+ if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
return false;
if (!er.is_transient()) // Ignore transient msgs
{
@@ -547,7 +487,7 @@
case RHM_JDAT_DEQ_MAGIC:
{
deq_rec dr;
- if (!decode(dr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
return false;
if (dr.xid_size())
{
@@ -580,7 +520,7 @@
case RHM_JDAT_TXA_MAGIC:
{
txn_rec ar;
- if (!decode(ar, fid, ifsp, cum_size_read, h, rd, read_pos))
+ if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
return false;
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
@@ -607,7 +547,7 @@
case RHM_JDAT_TXC_MAGIC:
{
txn_rec cr;
- if (!decode(cr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
return false;
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
@@ -656,9 +596,9 @@
const bool
jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, hdr& h,
- rcvdat& rd, std::streampos& rec_offset)
+ bool& lowi, rcvdat& rd, std::streampos& rec_offset)
{
- if (!check_owi(fid, h, rd, rec_offset))
+ if (!check_owi(fid, h, lowi, rd, rec_offset))
return false;
bool done = false;
while (!done)
@@ -673,14 +613,14 @@
rd._eo = rec_offset;
return false;
}
- if (!jfile_cycle(fid, ifsp, rd, false))
+ if (!jfile_cycle(fid, ifsp, lowi, rd, false))
return false;
}
return true;
}
const bool
-jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd, const bool jump_fro)
+jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd, const bool jump_fro)
{
if (ifsp->is_open())
{
@@ -692,7 +632,7 @@
if (fid >= _num_jfiles)
{
fid = 0;
- rd._owi = !rd._owi; // Flip owi
+ lowi = !lowi; // Flip local owi
}
if (fid == rd._ffid) // used up all journal files
return false;
@@ -728,9 +668,9 @@
}
const bool
-jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
+jcntl::check_owi(const u_int16_t fid, hdr& h, bool& lowi, rcvdat& rd, std::streampos& read_pos)
{
- if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
+ if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
{
u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -46,9 +46,9 @@
#include <jrnl/lfh.hpp>
#include <jrnl/rcvdat.hpp>
#include <jrnl/rmgr.hpp>
+#include <jrnl/slock.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
-#include <pthread.h>
#include <qpid/broker/PersistableQueue.h>
namespace rhm
@@ -548,12 +548,16 @@
void flush();
inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+
inline const u_int32_t get_wr_outstanding_aio_dblks() const
{ return _wrfc.aio_outstanding_dblks(); }
+
inline const u_int32_t get_wr_outstanding_aio_dblks(u_int16_t pi) const
{ return _wrfc.file_handle(pi)->wr_aio_outstanding_dblks(); }
+
inline const u_int32_t get_rd_outstanding_aio_dblks() const
{ return _rrfc.aio_outstanding_dblks(); }
+
inline const u_int32_t get_rd_outstanding_aio_dblks(u_int16_t pi) const
{ return _rrfc.file_handle(pi)->rd_aio_outstanding_dblks(); }
@@ -639,15 +643,17 @@
*/
void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
- const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi,
+ rcvdat& rd);
const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
- hdr& h, rcvdat& rd, std::streampos& rec_offset);
+ hdr& h, bool& lowi, rcvdat& rd, std::streampos& rec_offset);
- const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+ const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd,
const bool jump_fro);
- const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos);
+ const bool check_owi(const u_int16_t fid, hdr& h, bool& lowi, rcvdat& rd,
+ std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -49,13 +49,20 @@
// Macro for formatting commom system errors
#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << strerror(errno) << ")"
-#define MALLOC_CHK(ptr, var, cls, fn) if (ptr == NULL) { \
+
+#define MALLOC_CHK(ptr, var, cls, fn) if(ptr == NULL) { \
clean(); \
std::stringstream ss; \
ss << var << ": malloc() failed: " << FORMAT_SYSERR(errno); \
throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), cls, fn); \
}
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+ std::stringstream ss; \
+ ss << pfn << " failed: " << FORMAT_SYSERR(err); \
+ throw jexception(jerrno::JERR__PTHREAD, ss.str().c_str(), cls, fn); \
+ }
+
#define MAX_MSG_SIZE 1024
#define MAX_THROWING_SIZE 128
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -83,6 +83,8 @@
void print(std::string& jid)
{
std::cout << "Jorunal file analysis (jid=\"" << jid << "\"):" << std::endl;
+ std::cout << " Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") <<
+ std::endl;
std::cout << " Journal empty (_empty) = " << (_empty ? "TRUE" : "FALSE") <<
std::endl;
std::cout << " First fid (_ffid) = " << _ffid << std::endl;
Added: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -0,0 +1,86 @@
+/**
+* \file slock.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Messaging journal top-level control and interface class
+* rhm::journal::slock. See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_slock_hpp
+#define rhm_journal_slock_hpp
+
+#include <pthread.h>
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
+
+namespace rhm
+{
+namespace journal
+{
+
+ // Ultra-simple scoped lock class, auto-releases mutex when it goes out-of-scope
+ class slock
+ {
+ protected:
+ pthread_mutex_t* _m;
+ public:
+ inline slock(pthread_mutex_t* m) : _m(m)
+ {
+ PTHREAD_CHK(pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
+ }
+ inline ~slock()
+ {
+ PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
+ }
+ };
+
+ // Ultra-simple scoped try-lock class, auto-releases mutex when it goes out-of-scope
+ class stlock
+ {
+ protected:
+ pthread_mutex_t* _m;
+ bool _locked;
+ public:
+ inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
+ {
+ int ret = pthread_mutex_trylock(_m);
+ _locked = (ret == 0); // check if lock obtained
+ if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "pthread_mutex_trylock", "stlock",
+ "stlock");
+ }
+ inline ~stlock()
+ {
+ if (_locked)
+ PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock", "~stlock");
+ }
+ inline const bool locked() const { return _locked; }
+ };
+
+} // namespace journal
+} // namespace rhm
+
+#endif // ifndef rhm_journal_slock_hpp
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -118,13 +118,13 @@
if (this_data_len != tot_data_len && !external)
return RHM_IORES_NOTIMPL;
- if (_deq_busy || _abort_busy || _commit_busy)
- return RHM_IORES_BUSY;
-
iores res = pre_write_check(WMGR_ENQUEUE, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
+ if (_deq_busy || _abort_busy || _commit_busy)
+ return RHM_IORES_BUSY;
+
bool cont = false;
if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
{
@@ -254,13 +254,13 @@
if (xid_len)
assert(xid_ptr != NULL);
- if (_enq_busy || _abort_busy || _commit_busy)
- return RHM_IORES_BUSY;
-
iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
+ if (_enq_busy || _abort_busy || _commit_busy)
+ return RHM_IORES_BUSY;
+
bool cont = false;
if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
{
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-10 17:05:06 UTC (rev 1449)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-10 19:34:57 UTC (rev 1450)
@@ -61,6 +61,7 @@
rrfc::initialize(nfiles, fh_arr, rdp->_lfid);
_rid = rdp->_h_rid + 1;
_reset_ok = true;
+ _owi = rdp->_owi;
}
else
{
More information about the rhmessaging-commits
mailing list