Author: kpvdr
Date: 2008-12-02 16:29:00 -0500 (Tue, 02 Dec 2008)
New Revision: 2908
Added:
store/trunk/cpp/lib/jrnl/aio_callback.hpp
Removed:
store/trunk/cpp/lib/jrnl/aio_cb.hpp
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
Log:
Fix for BZ 472928 - "Enqueue completion lost/not signalled if queue is deleted".
A reorganization in an attempt to fix the bug which removes the C-style static function
call for callbacks and replaces it with a more c++-styled object pointer, and which did
not affect the problem is retained.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -101,7 +101,7 @@
JournalImpl::~JournalImpl()
{
if (_init_flag && !_stop_flag){
- try { stop(true); }
+ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls
are complete!
catch (const jexception& e) { log(LOG_ERROR, e.what()); }
}
(dynamic_cast<GetEventsFireEvent*>(getEventsFireEventsPtr.get()))->cancel();
@@ -129,16 +129,14 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- const journal::rd_aio_cb rd_cb,
- const journal::wr_aio_cb wr_cb)
+ journal::aio_callback* const cbp)
{
std::ostringstream oss;
oss << "Initialize; num_jfiles=" << num_jfiles << "
jfsize_sblks=" << jfsize_sblks;
oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss.str());
- jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks,
- rd_cb, wr_cb);
+ jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks, cbp);
log(LOG_DEBUG, "Initialization complete");
if (_mgmtObject != 0)
@@ -158,8 +156,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- const journal::rd_aio_cb rd_cb,
- const journal::wr_aio_cb wr_cb,
+ journal::aio_callback* const cbp,
boost::ptr_list<msgstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
@@ -188,10 +185,10 @@
}
jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks,
- rd_cb, wr_cb, &prep_xid_list, highest_rid);
+ cbp, &prep_xid_list, highest_rid);
} else {
jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks,
- rd_cb, wr_cb, 0, highest_rid);
+ cbp, 0, highest_rid);
}
// Populate PreparedTransaction lists from _tmap
@@ -482,6 +479,37 @@
}
void
+JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
+{
+ for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end();
i++)
+ {
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
+ if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear
id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default: ;
+ }
+ }
+ dtokp->release();
+ }
+}
+
+void
+JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
+{}
+
+void
JournalImpl::free_read_buffers()
{
if (_xidp) {
@@ -526,36 +554,6 @@
}
}
-// static AIO callback fns
-
-void
-JournalImpl::aio_wr_callback(jcntl* journal, std::vector<data_tok*>& dtokl)
-{
- for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end();
i++)
- {
- DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear
id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
-*/
- break;
- default:
- ;
- }
- }
- dtokp->release();
- }
-}
-
qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
qpid::management::Args& /*args*/,
std::string&
/*text*/)
@@ -575,7 +573,3 @@
return status;
}
-
-// void
-// JournalImpl::aio_rd_callback(jcntl* /*journal*/, std::vector<u_int16_t>&
/*pil*/)
-// {}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-12-02 21:29:00 UTC (rev 2908)
@@ -25,6 +25,7 @@
#define _JournalImpl_
#include <set>
+#include "jrnl/data_tok.hpp"
#include "jrnl/jcntl.hpp"
#include "jrnl/slock.hpp"
#include "DataTokenImpl.h"
@@ -68,7 +69,7 @@
inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0;
}
};
- class JournalImpl : public qpid::broker::ExternalQueueStore, public
journal::jcntl
+ class JournalImpl : public qpid::broker::ExternalQueueStore, public
journal::jcntl, public journal::aio_callback
{
private:
static qpid::broker::Timer* journalTimerPtr;
@@ -107,8 +108,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- const journal::rd_aio_cb rd_cb,
- const journal::wr_aio_cb wr_cb);
+ journal::aio_callback* const cbp);
inline void initialize(const u_int16_t num_jfiles,
const bool auto_expand,
@@ -117,7 +117,7 @@
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks) {
initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks,
- 0, &aio_wr_callback);
+ this);
}
void recover(const u_int16_t num_jfiles,
@@ -126,8 +126,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- const journal::rd_aio_cb rd_cb,
- const journal::wr_aio_cb wr_cb,
+ journal::aio_callback* const cbp,
boost::ptr_list<msgstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id);
@@ -142,7 +141,7 @@
u_int64_t& highest_rid,
u_int64_t queue_id) {
recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks,
wcache_num_pages, wcache_pgsize_sblks,
- 0, &aio_wr_callback, prep_tx_list_ptr, highest_rid,
queue_id);
+ this, prep_tx_list_ptr, highest_rid, queue_id);
}
void recover_complete();
@@ -188,6 +187,10 @@
void getEventsFire();
void flushFire();
+ // AIO callbacks
+ void wr_aio_cb(std::vector<journal::data_tok*>& dtokl);
+ void rd_aio_cb(std::vector<u_int16_t>& pil);
+
qpid::management::ManagementObject* GetManagementObject (void) const
{ return _mgmtObject; }
@@ -206,9 +209,6 @@
}
void handleIoResult(const journal::iores r);
- static void aio_wr_callback(jcntl* journal,
std::vector<journal::data_tok*>& dtokl);
- // static void aio_rd_callback(jcntl* journal,
std::vector<u_int16_t>& pil);
-
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/Makefile.am 2008-12-02 21:29:00 UTC (rev 2908)
@@ -68,7 +68,7 @@
jrnl/wmgr.cpp \
jrnl/wrfc.cpp \
jrnl/aio.hpp \
- jrnl/aio_cb.hpp \
+ jrnl/aio_callback.hpp \
jrnl/arr_cnt.hpp \
jrnl/cvar.hpp \
jrnl/data_tok.hpp \
Added: store/trunk/cpp/lib/jrnl/aio_callback.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio_callback.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/aio_callback.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -0,0 +1,55 @@
+/**
+* \file aio_callback.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains the definition for the AIO callback function
+* pointer.
+*
+* Copyright 2007, 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef mrg_journal_aio_callback_hpp
+#define mrg_journal_aio_callback_hpp
+
+#include <vector>
+#include <sys/types.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+ class data_tok;
+
+ class aio_callback
+ {
+ public:
+ virtual void wr_aio_cb(std::vector<data_tok*>& dtokl) = 0;
+ virtual void rd_aio_cb(std::vector<u_int16_t>& pil) = 0;
+
+ };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_aio_callback_hpp
Deleted: store/trunk/cpp/lib/jrnl/aio_cb.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio_cb.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/aio_cb.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -1,53 +0,0 @@
-/**
-* \file aio_cb.hpp
-*
-* Red Hat Messaging - Message Journal
-*
-* This file contains the definition for the AIO callback function
-* pointer.
-*
-* Copyright 2007, 2008 Red Hat, Inc.
-*
-* This file is part of Red Hat Messaging.
-*
-* Red Hat Messaging is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License as published by the Free Software Foundation; either
-* version 2.1 of the License, or (at your option) any later version.
-*
-* This library is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this library; if not, write to the Free Software
-* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-* USA
-*
-* The GNU Lesser General Public License is available in the file COPYING.
-*/
-
-#ifndef mrg_journal_aio_cb_hpp
-#define mrg_journal_aio_cb_hpp
-
-#include <vector>
-#include <sys/types.h>
-
-namespace mrg
-{
-namespace journal
-{
-
- class jcntl;
- class data_tok;
-
- /**
- * \brief Callback function pointer to be called when AIO events arrive.
- */
- typedef void (*wr_aio_cb)(jcntl* journal, std::vector<data_tok*>& dtokl);
- typedef void (*rd_aio_cb)(jcntl* journal, std::vector<u_int16_t>& pil);
-} // namespace journal
-} // namespace mrg
-
-#endif // ifndef mrg_journal_aio_cb_hpp
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -89,7 +89,7 @@
void
jcntl::initialize(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t
ae_max_jfiles,
const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t
wcache_pgsize_sblks,
- const rd_aio_cb rd_cb, const wr_aio_cb wr_cb)
+ aio_callback* const cbp)
{
_init_flag = false;
_stop_flag = false;
@@ -117,9 +117,8 @@
_wrfc.initialize(_jfsize_sblks);
_rrfc.initialize();
_rrfc.set_findex(0);
- _rmgr.initialize(rd_cb);
- _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
- JRNL_WMGR_MAXWAITUS);
+ _rmgr.initialize(cbp);
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
write_infofile();
@@ -130,7 +129,8 @@
void
jcntl::recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t
ae_max_jfiles,
const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t
wcache_pgsize_sblks,
- const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const
std::vector<std::string>* prep_txn_list_ptr,
+// const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const
std::vector<std::string>* prep_txn_list_ptr,
+ aio_callback* const cbp, const std::vector<std::string>*
prep_txn_list_ptr,
u_int64_t& highest_rid)
{
_init_flag = false;
@@ -163,8 +163,8 @@
_wrfc.initialize(_jfsize_sblks, &_rcvdat);
_rrfc.initialize();
_rrfc.set_findex(_rcvdat.ffid());
- _rmgr.initialize(rd_cb);
- _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS,
+ _rmgr.initialize(cbp);
+ _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS,
(_rcvdat._lffull ? 0 : _rcvdat._eo));
_readonly_flag = true;
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -191,14 +191,13 @@
* \param jfsize_sblks The size of each journal file expressed in softblocks.
* \param wcache_num_pages The number of write cache pages to create.
* \param wcache_pgsize_sblks The size in sblks of each write cache page.
- * \param rd_cb Function pointer to callback function for read operations. May be
0 (NULL).
- * \param wr_cb Function pointer to callback function for write operations. May be
0 (NULL).
+ * \param cbp Pointer to object containing callback functions for read and write
operations. May be 0 (NULL).
*
* \exception TODO
*/
void initialize(const u_int16_t num_jfiles, const bool auto_expand, const
u_int16_t ae_max_jfiles,
const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const
u_int32_t wcache_pgsize_sblks,
- const rd_aio_cb rd_cb, const wr_aio_cb wr_cb);
+ aio_callback* const cbp);
/**
* /brief Initialize journal by recovering state from previously written journal.
@@ -227,8 +226,7 @@
* \param jfsize_sblks The size of each journal file expressed in softblocks.
* \param wcache_num_pages The number of write cache pages to create.
* \param wcache_pgsize_sblks The size in sblks of each write cache page.
- * \param rd_cb Function pointer to callback function for read operations. May be
0 (NULL).
- * \param wr_cb Function pointer to callback function for write operations. May be
0 (NULL).
+ * \param cbp Pointer to object containing callback functions for read and write
operations. May be 0 (NULL).
* \param prep_txn_list_ptr
* \param highest_rid Returns the highest rid found in the journal during recover
*
@@ -236,8 +234,7 @@
*/
void recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t
ae_max_jfiles,
const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const
u_int32_t wcache_pgsize_sblks,
- const rd_aio_cb rd_cb, const wr_aio_cb wr_cb, const
std::vector<std::string>* prep_txn_list_ptr,
- u_int64_t& highest_rid);
+ aio_callback* const cbp, const std::vector<std::string>*
prep_txn_list_ptr, u_int64_t& highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal
operation
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -90,6 +90,7 @@
_pg_cntr(0),
_pg_offset_dblks(0),
_aio_evt_rem(0),
+ _cbp(0),
_enq_rec(),
_deq_rec(),
_txn_rec()
@@ -101,7 +102,7 @@
}
void
-pmgr::initialize(const u_int32_t cache_pgsize_sblks, const u_int16_t cache_num_pages)
+pmgr::initialize(aio_callback* const cbp, const u_int32_t cache_pgsize_sblks, const
u_int16_t cache_num_pages)
{
// As static use of this class keeps old values around, clean up first...
pmgr::clean();
@@ -111,6 +112,7 @@
_aio_evt_rem = 0;
_cache_pgsize_sblks = cache_pgsize_sblks;
_cache_num_pages = cache_num_pages;
+ _cbp = cbp;
// 1. Allocate page memory (as a single block)
std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -42,6 +42,7 @@
#include <deque>
#include "jrnl/aio.hpp"
+#include "jrnl/aio_callback.hpp"
#include "jrnl/data_tok.hpp"
#include "jrnl/deq_rec.hpp"
#include "jrnl/enq_map.hpp"
@@ -110,6 +111,7 @@
u_int32_t _pg_cntr; ///< Page counter; determines if file rotation
req'd
u_int32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
u_int32_t _aio_evt_rem; ///< Remaining AIO events
+ aio_callback* _cbp; ///< Pointer to callback object
enq_rec _enq_rec; ///< Enqueue record used for
encoding/decoding
deq_rec _deq_rec; ///< Dequeue record used for
encoding/decoding
@@ -126,8 +128,8 @@
inline u_int16_t cache_num_pages() const { return _cache_num_pages; }
protected:
- virtual void initialize(const u_int32_t cache_pgsize_sblks,
- const u_int16_t cache_num_pages);
+ virtual void initialize(aio_callback* const cbp, const u_int32_t
cache_pgsize_sblks,
+ const u_int16_t cache_num_pages);
virtual void rotate_page() = 0;
virtual void clean();
};
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -57,10 +57,9 @@
}
void
-rmgr::initialize(const rd_aio_cb rd_cb)
+rmgr::initialize(aio_callback* const cbp)
{
- _cb = rd_cb;
- initialize();
+ pmgr::initialize(cbp, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
clean();
// Allocate memory for reading file header
if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
@@ -318,8 +317,8 @@
}
// Perform AIO return callback
- if (_cb && ret)
- (_cb)(_jc, pil);
+ if (_cbp && ret)
+ _cbp->rd_aio_cb(pil);
return ret;
}
@@ -365,12 +364,6 @@
_pg_offset_dblks = 0;
}
-void
-rmgr::initialize()
-{
- pmgr::initialize(JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
-}
-
iores
rmgr::pre_read_check(data_tok* dtokp)
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -40,7 +40,6 @@
}
#include <cstring>
-#include "jrnl/aio_cb.hpp"
#include "jrnl/enums.hpp"
#include "jrnl/file_hdr.hpp"
#include "jrnl/pmgr.hpp"
@@ -63,7 +62,6 @@
private:
rrfc& _rrfc; ///< Ref to read rotating file controller
rec_hdr _hdr; ///< Header used to determind record type
- rd_aio_cb _cb; ///< Callback function pointer for AIO events
void* _fhdr_buffer; ///< Buffer used for fhdr reads
aio_cb* _fhdr_aio_cb_ptr; ///< iocb pointer for fhdr reads
@@ -74,7 +72,7 @@
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
virtual ~rmgr();
- void initialize(const rd_aio_cb rd_cb);
+ void initialize(aio_callback* const cbp);
iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize, bool& transient, bool& external,
data_tok* dtokp,
bool ignore_pending_txns);
@@ -90,7 +88,6 @@
*/
private:
- void initialize();
void clean();
void init_validation();
iores pre_read_check(data_tok* dtokp);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -89,7 +89,7 @@
}
void
-wmgr::initialize(const wr_aio_cb wr_cb, const u_int32_t wcache_pgsize_sblks,
+wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp, const u_int32_t
max_iowait_us,
std::size_t eo)
{
@@ -99,9 +99,8 @@
_commit_busy = false;
_max_dtokpp = max_dtokpp;
_max_io_wait_us = max_iowait_us;
- _cb = wr_cb;
- initialize(wcache_pgsize_sblks, wcache_num_pages);
+ initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
_jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE;
_jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
@@ -866,8 +865,8 @@
pcbp->_state = state;
// Perform AIO return callback
- if (_cb && tot_data_toks)
- (_cb)(_jc, dtokl);
+ if (_cbp && tot_data_toks)
+ _cbp->wr_aio_cb(dtokl);
}
else // File header writes have no pcb
{
@@ -901,9 +900,9 @@
}
void
-wmgr::initialize(const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages)
+wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, const
u_int16_t wcache_num_pages)
{
- pmgr::initialize(wcache_pgsize_sblks, wcache_num_pages);
+ pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_num_jfiles = _jc->num_jfiles();
if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -40,7 +40,6 @@
}
#include <cstring>
-#include "jrnl/aio_cb.hpp"
#include "jrnl/enums.hpp"
#include "jrnl/pmgr.hpp"
#include "jrnl/wrfc.hpp"
@@ -96,7 +95,6 @@
deq_rec _deq_rec; ///< Dequeue record used for
encoding/decoding
txn_rec _txn_rec; ///< Transaction record used for
encoding/decoding
std::set<std::string> _txn_pending_set; ///< Set containing xids of
pending commits/aborts
- wr_aio_cb _cb; ///< Callback function pointer for AIO events
public:
wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);
@@ -104,7 +102,7 @@
const u_int32_t max_iowait_us);
virtual ~wmgr();
- void initialize(wr_aio_cb wr_cb, const u_int32_t wcache_pgsize_sblks,
+ void initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp,
const u_int32_t max_iowait_us, std::size_t eo = 0);
iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
@@ -125,7 +123,8 @@
const std::string status_str() const;
private:
- void initialize(const u_int32_t wcache_pgsize_sblks, const u_int16_t
wcache_num_pages);
+ void initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks,
+ const u_int16_t wcache_num_pages);
iores pre_write_check(const _op_type op, const data_tok* const dtokp,
const std::size_t xidsize = 0, const std::size_t dsize = 0, const bool
external = false)
const;
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -61,7 +61,7 @@
bool done() { if (flag || _wstate == NONE) return true; else { flag = true; return
false; } }
};
-class test_jrnl : public jcntl
+class test_jrnl : public jcntl, public aio_callback
{
public:
test_jrnl(const std::string& jid, const std::string& jdir, const
std::string& base_filename) :
@@ -70,16 +70,15 @@
void initialize(const u_int16_t num_jfiles, const bool ae, const u_int16_t
ae_max_jfiles,
const u_int32_t jfsize_sblks)
{
- jcntl::initialize(num_jfiles, ae, ae_max_jfiles, jfsize_sblks,
JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, 0,
- &aio_wr_callback);
+ jcntl::initialize(num_jfiles, ae, ae_max_jfiles, jfsize_sblks,
JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
+ this);
_jdir.create_dir();
}
void recover(const u_int16_t num_jfiles, const bool ae, const u_int16_t
ae_max_jfiles, const u_int32_t jfsize_sblks,
vector<string>* txn_list, u_int64_t& highest_rid)
- { jcntl::recover(num_jfiles, ae, ae_max_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES,
JRNL_WMGR_DEF_PAGE_SIZE, 0,
- &aio_wr_callback, txn_list, highest_rid); }
-private:
- static void aio_wr_callback(jcntl*, std::vector<data_tok*>& dtokl)
+ { jcntl::recover(num_jfiles, ae, ae_max_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES,
JRNL_WMGR_DEF_PAGE_SIZE, this,
+ txn_list, highest_rid); }
+ void wr_aio_cb(std::vector<data_tok*>& dtokl)
{
for (std::vector<data_tok*>::const_iterator i=dtokl.begin();
i!=dtokl.end(); i++)
{
@@ -88,6 +87,7 @@
delete dtp;
}
}
+ void rd_aio_cb(std::vector<u_int16_t>& /*pil*/) {}
};
/*
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -112,7 +112,7 @@
{
u_int64_t highest_rid;
recover(_jpp->num_jfiles(), _jpp->is_ae(), _jpp->ae_max_jfiles(),
_jpp->jfsize_sblks(),
- _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(),
aio_rd_callback, aio_wr_callback,
+ _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(), this,
0, highest_rid);
recover_complete();
}
@@ -120,14 +120,14 @@
{
if (e.err_code() == mrg::journal::jerrno::JERR_JDIR_STAT)
initialize(_jpp->num_jfiles(), _jpp->is_ae(),
_jpp->ae_max_jfiles(), _jpp->jfsize_sblks(),
- _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(),
aio_rd_callback, aio_wr_callback);
+ _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(),
this);
else
throw;
}
}
else
initialize(_jpp->num_jfiles(), _jpp->is_ae(), _jpp->ae_max_jfiles(),
_jpp->jfsize_sblks(),
- _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(),
aio_rd_callback, aio_wr_callback);
+ _jpp->wcache_num_pages(), _jpp->wcache_pgsize_sblks(), this);
}
catch (const mrg::journal::jexception& e) { _tcrp->add_exception(e); }
catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -412,20 +412,45 @@
_deq_list_cv.broadcast();
}
-// static AIO callback fns
+// AIO callbacks
void
-jrnl_instance::aio_rd_callback(jcntl* journal, std::vector<u_int16_t>&
/*pil*/)
+jrnl_instance::wr_aio_cb(std::vector<journal::data_tok*>& dtokl)
{
- jrnl_instance::handle_rd_callback(journal);
+ for (std::vector<journal::data_tok*>::const_iterator i=dtokl.begin();
i!=dtokl.end(); i++)
+ {
+ if ((*i)->wstate() == journal::data_tok::ENQ || (*i)->wstate() ==
journal::data_tok::DEQ)
+ {
+ journal::data_tok* dtokp = *i;
+ if (dtokp->wstate() == journal::data_tok::ENQ)
+ {
+ if (_args_ptr->read_mode.val() == read_arg::NONE)
+ {
+ mrg::journal::slock sl(&_deq_list_mutex);
+ _dtok_deq_list.push_back(dtokp);
+ _deq_list_cv.broadcast();
+ }
+ else
+ {
+ mrg::journal::slock sl(&_rd_list_mutex);
+ _dtok_rd_list.push_back(dtokp);
+ _rd_list_cv.broadcast();
+ }
+ }
+ else // DEQ
+ {
+ mrg::journal::slock sl(&_wr_full_mutex);
+ _wr_full_cv.broadcast();
+ }
+ }
+ }
}
void
-jrnl_instance::aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>&
dtokl)
+jrnl_instance::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
{
- for (std::vector<journal::data_tok*>::const_iterator i=dtokl.begin();
i!=dtokl.end(); i++)
- if ((*i)->wstate() == journal::data_tok::ENQ || (*i)->wstate() ==
journal::data_tok::DEQ)
- jrnl_instance::handle_wr_callback(journal, *i);
+ mrg::journal::slock sl(&_rd_aio_mutex);
+ _rd_aio_cv.broadcast();
}
} // namespace jtt
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-12-02 20:44:34 UTC (rev 2907)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-12-02 21:29:00 UTC (rev 2908)
@@ -41,7 +41,7 @@
namespace jtt
{
- class jrnl_instance : public mrg::journal::jcntl
+ class jrnl_instance : public mrg::journal::jcntl, public mrg::journal::aio_callback
{
public:
typedef boost::shared_ptr<jrnl_instance> shared_ptr;
@@ -87,41 +87,11 @@
void run_tc() throw ();
void tc_wait_compl() throw ();
- private:
- inline void handle_rd_callback()
- {
- mrg::journal::slock sl(&_rd_aio_mutex);
- _rd_aio_cv.broadcast();
- }
- static inline void handle_rd_callback(jcntl* jp)
- { static_cast<jrnl_instance*>(jp)->handle_rd_callback(); }
+ // AIO callbacks
+ void wr_aio_cb(std::vector<journal::data_tok*>& dtokl);
+ void rd_aio_cb(std::vector<u_int16_t>& pil);
- inline void handle_wr_callback(mrg::journal::data_tok* dtokp)
- {
- if (dtokp->wstate() == journal::data_tok::ENQ)
- {
- if (_args_ptr->read_mode.val() == read_arg::NONE)
- {
- mrg::journal::slock sl(&_deq_list_mutex);
- _dtok_deq_list.push_back(dtokp);
- _deq_list_cv.broadcast();
- }
- else
- {
- mrg::journal::slock sl(&_rd_list_mutex);
- _dtok_rd_list.push_back(dtokp);
- _rd_list_cv.broadcast();
- }
- }
- else // DEQ
- {
- mrg::journal::slock sl(&_wr_full_mutex);
- _wr_full_cv.broadcast();
- }
- }
- static inline void handle_wr_callback(jcntl* jp, mrg::journal::data_tok* dtokp)
- { static_cast<jrnl_instance*>(jp)->handle_wr_callback(dtokp); }
-
+ private:
void run_enq() throw ();
inline static void* run_enq(void* p)
{ static_cast<jrnl_instance*>(p)->run_enq(); return 0; }
@@ -141,9 +111,9 @@
void panic();
- // static callbacks
- static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>&
pil);
- static void aio_wr_callback(jcntl* journal,
std::vector<journal::data_tok*>& dtokl);
+// // static callbacks
+// static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>&
pil);
+// static void aio_wr_callback(jcntl* journal,
std::vector<journal::data_tok*>& dtokl);
};
} // namespace jtt