[rhmessaging-commits] rhmessaging commits: r2232 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jul 29 16:13:17 EDT 2008


Author: kpvdr
Date: 2008-07-29 16:13:17 -0400 (Tue, 29 Jul 2008)
New Revision: 2232

Added:
   store/trunk/cpp/lib/jrnl/aio.cpp
   store/trunk/cpp/lib/jrnl/aio.hpp
Modified:
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/pmgr.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Placed all libaio functions into static class aio.

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/Makefile.am	2008-07-29 20:13:17 UTC (rev 2232)
@@ -39,6 +39,7 @@
   StoreException.h				\
   StringDbt.h					\
   TxnCtxt.h					\
+  jrnl/aio.cpp                  \
   jrnl/arr_cnt.cpp				\
   jrnl/cvar.cpp					\
   jrnl/data_tok.cpp				\
@@ -62,6 +63,7 @@
   jrnl/txn_rec.cpp				\
   jrnl/wmgr.cpp					\
   jrnl/wrfc.cpp					\
+  jrnl/aio.hpp                  \
   jrnl/aio_cb.hpp				\
   jrnl/arr_cnt.hpp				\
   jrnl/cvar.hpp					\

Added: store/trunk/cpp/lib/jrnl/aio.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/aio.cpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -0,0 +1,11 @@
+#include "aio.hpp"
+
+
+namespace rhm
+{
+namespace journal
+{
+
+
+} // namespace journal
+} // namespace rhm

Added: store/trunk/cpp/lib/jrnl/aio.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/aio.hpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -0,0 +1,86 @@
+
+#ifndef rhm_journal_aio_hpp
+#define rhm_journal_aio_hpp
+
+#include <libaio.h>
+#include <cstring>
+#include <sys/types.h>
+#include <string.h>
+
+namespace rhm
+{
+namespace journal
+{
+
+typedef iocb aio_cb;
+typedef io_event aio_event;
+
+class aio
+{
+public:
+    static inline int queue_init(int maxevents, io_context_t* ctxp)
+    {
+        return ::io_queue_init(maxevents, ctxp);
+    }
+
+    static inline int queue_release(io_context_t ctx)
+    {
+        return ::io_queue_release(ctx);
+    }
+
+    static inline int submit(io_context_t ctx, long nr, aio_cb* aios[])
+    {
+        return ::io_submit(ctx, nr, aios);
+    }
+
+    static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events,
+            timespec* timeout)
+    {
+        return ::io_getevents(ctx, min_nr, nr, events, timeout);
+    }
+
+    static inline void prep_pread(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+            int64_t offset)
+    {
+        ::io_prep_pread(aiocbp, fd, buf, count, offset);
+    }
+
+    // Special version of libaio's io_prep_pread() which preserves the value of the data
+    // pointer. This allows iocbs to be initialized with a pointer that can be re-used.
+    static inline void prep_pread_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+            int64_t offset)
+    {
+        std::memset((void*)((char*)aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PREAD;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+
+    static inline void prep_pwrite(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+            int64_t offset)
+    {
+        ::io_prep_pwrite(aiocbp, fd, buf, count, offset);
+    }
+
+    // Special version of libaio's io_prep_pwrite() which preserves the value of the data
+    // pointer. This allows iocbs to be initialized with a pointer that can be re-used.
+    static inline void prep_pwrite_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+            int64_t offset)
+    {
+        std::memset((void*)((char*)aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+        aiocbp->aio_fildes = fd;
+        aiocbp->aio_lio_opcode = IO_CMD_PWRITE;
+        aiocbp->aio_reqprio = 0;
+        aiocbp->u.c.buf = buf;
+        aiocbp->u.c.nbytes = count;
+        aiocbp->u.c.offset = offset;
+    }
+};
+
+} // namespace journal
+} // namespace rhm
+
+#endif

Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -83,8 +83,8 @@
         _page_base_ptr(0),
         _page_ptr_arr(0),
         _page_cb_arr(0),
-        _iocb_arr(0),
-        _ioevt_arr(0),
+        _aio_cb_arr(0),
+        _aio_event_arr(0),
         _ioctx(0),
         _pg_index(0),
         _pg_cntr(0),
@@ -132,8 +132,8 @@
     std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
 
     // 5. Allocate IO control block (iocb) array
-    _iocb_arr = (iocb*)std::malloc(_cache_num_pages * sizeof(iocb));
-    MALLOC_CHK(_iocb_arr, "_iocb_arr", "pmgr", "initialize");
+    _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+    MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
 
     // 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
     for (u_int16_t i=0; i<_cache_num_pages; i++)
@@ -144,16 +144,16 @@
         _page_cb_arr[i]._pbuff = _page_ptr_arr[i];
         _page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
         _page_cb_arr[i]._pdtokl->clear();
-        _iocb_arr[i].data = (void*)&_page_cb_arr[i];
+        _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
     }
 
     // 7. Allocate io_event array, max one event per cache page plus one for each file
     const u_int16_t max_aio_evts = _cache_num_pages + _jc->num_jfiles();
-    _ioevt_arr = (io_event*)std::malloc(max_aio_evts * sizeof(io_event));
-    MALLOC_CHK(_ioevt_arr, "_ioevt_arr", "pmgr", "initialize");
+    _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+    MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
 
     // 8. Initialize AIO context
-    if (int ret = ::io_queue_init(max_aio_evts, &_ioctx))
+    if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
     {
         std::ostringstream oss;
         oss << "io_queue_init() failed: " << std::strerror(-ret) << " (" << ret << ")";
@@ -167,7 +167,7 @@
     // clean up allocated memory here
 
     if (_ioctx)
-        ::io_queue_release(_ioctx);
+        aio::queue_release(_ioctx);
 
     std::free(_page_base_ptr);
     _page_base_ptr = 0;
@@ -183,11 +183,11 @@
     std::free(_page_cb_arr);
     _page_cb_arr = 0;
 
-    std::free(_iocb_arr);
-    _iocb_arr = 0;
+    std::free(_aio_cb_arr);
+    _aio_cb_arr = 0;
 
-    std::free(_ioevt_arr);
-    _ioevt_arr = 0;
+    std::free(_aio_event_arr);
+    _aio_event_arr = 0;
 }
 
 const char*

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -41,7 +41,7 @@
 }
 
 #include <deque>
-#include <libaio.h>
+#include "jrnl/aio.hpp"
 #include "jrnl/data_tok.hpp"
 #include "jrnl/deq_rec.hpp"
 #include "jrnl/enq_map.hpp"
@@ -103,8 +103,8 @@
         void* _page_base_ptr;           ///< Base pointer to page memory
         void** _page_ptr_arr;           ///< Array of pointers to cache_num_pages in page memory
         page_cb* _page_cb_arr;          ///< Array of page_cb structs
-        iocb* _iocb_arr;                ///< Array of iocb structs
-        io_event* _ioevt_arr;           ///< Array of io_events
+        aio_cb* _aio_cb_arr;            ///< Array of iocb structs
+        aio_event* _aio_event_arr;      ///< Array of io_events
         io_context_t _ioctx;            ///< AIO context for read/write operations
         u_int16_t _pg_index;            ///< Index of current page being used
         u_int32_t _pg_cntr;             ///< Page counter; determines if file rotation req'd

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -48,7 +48,7 @@
         _hdr(),
         _valid(false),
         _fhdr_buffer(0),
-        _iocbp(0),
+        _fhdr_aio_cb_ptr(0),
         _fhdr_rd_outstanding(false)
 {}
 
@@ -71,8 +71,8 @@
         oss << FORMAT_SYSERR(errno);
         throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
     }
-    _iocbp = new iocb;
-    std::memset(_iocbp, 0, sizeof(iocb*));
+    _fhdr_aio_cb_ptr = new aio_cb;
+    std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
 }
 
 void
@@ -81,10 +81,10 @@
     std::free(_fhdr_buffer);
     _fhdr_buffer = 0;
 
-    if (_iocbp)
+    if (_fhdr_aio_cb_ptr)
     {
-        delete _iocbp;
-        _iocbp = 0;
+        delete _fhdr_aio_cb_ptr;
+        _fhdr_aio_cb_ptr = 0;
     }
 }
 
@@ -253,7 +253,7 @@
 rmgr::get_events(page_state state)
 {
     int ret = 0;
-    if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
+    if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
     {
         if (ret == -EINTR) // No events
             return 0;
@@ -273,17 +273,17 @@
             throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "rmgr", "get_events");
         }
         _aio_evt_rem--;
-        iocb* iocbp = _ioevt_arr[i].obj; // This I/O control block (iocb)
-        page_cb* pcbp = (page_cb*)(iocbp->data); // This page control block (pcb)
-        long aioret = (long)_ioevt_arr[i].res;
+        aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+        page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+        long aioret = (long)_aio_event_arr[i].res;
         if (aioret < 0)
         {
             std::ostringstream oss;
             oss << "AIO read operation failed: " << std::strerror(-aioret) << " (" << aioret << ")";
-            oss << " [pg=" << pcbp->_index << " buf=" << iocbp->u.c.buf;
-            oss << " rsize=0x" << std::hex << iocbp->u.c.nbytes;
-            oss << " offset=0x" << iocbp->u.c.offset << std::dec;
-            oss << " fh=" << iocbp->aio_fildes << "]";
+            oss << " [pg=" << pcbp->_index << " buf=" << aiocbp->u.c.buf;
+            oss << " rsize=0x" << std::hex << aiocbp->u.c.nbytes;
+            oss << " offset=0x" << aiocbp->u.c.offset << std::dec;
+            oss << " fh=" << aiocbp->aio_fildes << "]";
             throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
         }
 
@@ -292,7 +292,7 @@
             // Increment the completed read offset
             // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
             // Use stored pointer to nlfh in the pcb instead.
-            pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+            pcbp->_rdblks = aiocbp->u.c.nbytes / JRNL_DBLK_SIZE;
             pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
             pcbp->_state = state;
             pil[i] = pcbp->_index;
@@ -588,10 +588,10 @@
             int16_t pi = (i + first_uninit) % _cache_num_pages;
             // TODO: For perf, combine contiguous pages into single read
             //   1 or 2 AIOs needed depending on whether read block folds
-            iocb* this_iocb_ptr = &_iocb_arr[pi];
-            rhm_prep_pread(this_iocb_ptr, _rrfc.fh(), _page_ptr_arr[pi],
+            aio_cb* aiocbp = &_aio_cb_arr[pi];
+            aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi],
                     rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
-            if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
+            if (aio::submit(_ioctx, 1, &aiocbp) < 0)
                 throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
             _rrfc.add_subm_cnt_dblks(rd_size);
             _aio_evt_rem++;
@@ -653,8 +653,8 @@
 rmgr::init_file_header_read(u_int16_t fid)
 {
     int rfh = _rrfc.file_handle(fid)->rd_fh();
-    rhm_prep_pread(_iocbp, rfh, _fhdr_buffer, _sblksize, 0);
-    if (::io_submit(_ioctx, 1, &_iocbp) < 0)
+    aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
+    if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
         throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
     _aio_evt_rem++;
     _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -61,15 +61,15 @@
     class rmgr : public pmgr
     {
     private:
-        rrfc& _rrfc;            ///< Ref to read rotating file controller
-        rec_hdr _hdr;           ///< Header used to determind record type
-        rd_aio_cb _cb;          ///< Callback function pointer for AIO events
+        rrfc& _rrfc;                ///< Ref to read rotating file controller
+        rec_hdr _hdr;               ///< Header used to determind record type
+        rd_aio_cb _cb;              ///< Callback function pointer for AIO events
 
-        bool _valid;            ///< Flag is true when read pages contain vailid data
-        void* _fhdr_buffer;     ///< Buffer used for fhdr reads
-        iocb* _iocbp;           ///< iocb pointer for fhdr reads
-        file_hdr _fhdr;         ///< file header instance for reading file headers
-        bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
+        bool _valid;                ///< Flag is true when read pages contain vailid data
+        void* _fhdr_buffer;         ///< Buffer used for fhdr reads
+        aio_cb* _fhdr_aio_cb_ptr;   ///< iocb pointer for fhdr reads
+        file_hdr _fhdr;             ///< file header instance for reading file headers
+        bool _fhdr_rd_outstanding;  ///< true if a fhdr read is outstanding
 
     public:
         rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
@@ -106,22 +106,6 @@
         void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
                 std::size_t& xidsize);
         void init_file_header_read(u_int16_t fid);
-
-
-        // Special version of libaio's io_prep_pread() which preserves the value of the data
-        // pointer. This allows iocbs to be initialized with a pointer that can be re-used. Note
-        // that C++ does not support type long long, this has been replaced with int64_t.
-        static inline void rhm_prep_pread(struct iocb *iocb, int fd, void *buf, std::size_t count,
-                int64_t offset)
-        {
-            std::memset((void*)((char*)iocb + sizeof(void*)), 0, sizeof(*iocb) - sizeof(void*));
-            iocb->aio_fildes = fd;
-            iocb->aio_lio_opcode = IO_CMD_PREAD;
-            iocb->aio_reqprio = 0;
-            iocb->u.c.buf = buf;
-            iocb->u.c.nbytes = count;
-            iocb->u.c.offset = offset;
-        }
     };
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -51,7 +51,7 @@
         _max_io_wait_us(0),
         _fhdr_base_ptr(0),
         _fhdr_ptr_arr(0),
-        _iocba(0),
+        _fhdr_aio_cb_arr(0),
         _cached_offset_dblks(0),
         _jfsize_dblks(0),
         _jfsize_pgs(0),
@@ -71,7 +71,7 @@
         _max_io_wait_us(max_iowait_us),
         _fhdr_base_ptr(0),
         _fhdr_ptr_arr(0),
-        _iocba(0),
+        _fhdr_aio_cb_arr(0),
         _cached_offset_dblks(0),
         _jfsize_dblks(0),
         _jfsize_pgs(0),
@@ -707,14 +707,14 @@
             dblk_roundup();
 
             std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE;
-            rhm_prep_pwrite(&_iocb_arr[_pg_index], _wrfc.fh(),
+            aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
+            aio::prep_pwrite_2(aiocbp, _wrfc.fh(),
                 (char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE,
                 _wrfc.subm_offs());
-            iocb* this_iocb_ptr = &_iocb_arr[_pg_index];
-            page_cb* pcbp = (page_cb*)(this_iocb_ptr->data); // This page control block (pcb)
+            page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
             pcbp->_wdblks = _cached_offset_dblks;
             pcbp->_wfh = _wrfc.file_handle();
-            if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
+            if (aio::submit(_ioctx, 1, &aiocbp) < 0)
                 throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
             _wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
             _wrfc.incr_aio_cnt();
@@ -746,7 +746,7 @@
 wmgr::get_events(page_state state)
 {
     int ret = 0;
-    if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
+    if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
     {
         std::ostringstream oss;
         oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
@@ -763,9 +763,9 @@
             throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
         }
         _aio_evt_rem--;
-        iocb* iocbp = _ioevt_arr[i].obj; // This I/O control block (iocb)
-        page_cb* pcbp = (page_cb*)(iocbp->data); // This page control block (pcb)
-        long aioret = (long)_ioevt_arr[i].res;
+        aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+        page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+        long aioret = (long)_aio_event_arr[i].res;
         if (aioret < 0)
         {
             std::ostringstream oss;
@@ -774,11 +774,11 @@
                 oss << "pg=" << pcbp->_index;
             else
             {
-                file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+                file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
                 oss << "fid=" << fhp->_fid;
             }
-            oss << " size=" << iocbp->u.c.nbytes;
-            oss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
+            oss << " size=" << aiocbp->u.c.nbytes;
+            oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
             throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
         }
         if (pcbp) // Page writes have pcb
@@ -872,7 +872,7 @@
         else // File header writes have no pcb
         {
             // get fid from original file header record, update pointers for that fid
-            file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+            file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
             u_int32_t fid = fhp->_fid;
             nlfh* nlfhp = _wrfc.file_handle(fid);
             nlfhp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
@@ -915,13 +915,13 @@
     }
     _fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*));
     MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
-    _iocba = (iocb**)std::malloc(sizeof(iocb*) * _num_jfiles);
-    MALLOC_CHK(_iocba, "_iocba", "wmgr", "initialize");
-    std::memset(_iocba, 0, sizeof(iocb*) * _num_jfiles);
+    _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles);
+    MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize");
+    std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles);
     for (u_int16_t i=0; i<_num_jfiles; i++)
     {
         _fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
-        _iocba[i] = new iocb;
+        _fhdr_aio_cb_arr[i] = new aio_cb;
     }
     _page_cb_arr[0]._state = IN_USE;
     _ddtokl.clear();
@@ -1034,9 +1034,9 @@
 #ifdef RHM_CLEAN
     std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
 #endif
-    iocb* iocbp = _iocba[fid];
-    ::io_prep_pwrite(iocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
-    if (::io_submit(_ioctx, 1, &iocbp) < 0)
+    aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
+    aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
+    if (aio::submit(_ioctx, 1, &aiocbp) < 0)
         throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
     _aio_evt_rem++;
     _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
@@ -1064,12 +1064,12 @@
     std::free(_fhdr_ptr_arr);
     _fhdr_ptr_arr = 0;
 
-    if (_iocba)
+    if (_fhdr_aio_cb_arr)
     {
         for (u_int32_t i=0; i<_num_jfiles; i++)
-            delete _iocba[i];
-        std::free(_iocba);
-        _iocba = 0;
+            delete _fhdr_aio_cb_arr[i];
+        std::free(_fhdr_aio_cb_arr);
+        _fhdr_aio_cb_arr = 0;
     }
 }
 

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-07-29 20:13:17 UTC (rev 2232)
@@ -75,7 +75,7 @@
         u_int32_t _max_io_wait_us;      ///< Max wait in microseconds till submit
         void* _fhdr_base_ptr;           ///< Base pointer to file header memory
         void** _fhdr_ptr_arr;           ///< Array of pointers to file headers memory
-        iocb** _iocba;                  ///< Array of iocb pointers for file header writes
+        aio_cb** _fhdr_aio_cb_arr;      ///< Array of iocb pointers for file header writes
         u_int32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
         std::deque<data_tok*> _ddtokl;  ///< Deferred dequeue data_tok list
         u_int32_t _jfsize_dblks;        ///< Journal file size in dblks (NOT sblks!)
@@ -136,21 +136,6 @@
         void write_fhdr(u_int64_t rid, u_int32_t fid, std::size_t fro);
         void rotate_page();
         void clean();
-
-        // Special version of libaio's io_prep_pwrite() which preserves the value of the data
-        // pointer. This allows iocbs to be initialized with a pointer that can be re-used. Note
-        // that C++ does not support type long long, this has been replaced with int64_t.
-        static inline void rhm_prep_pwrite(struct iocb *iocb, int fd, void *buf,
-                std::size_t count, int64_t offset)
-        {
-            std::memset((void*)((char*)iocb + sizeof(void*)), 0, sizeof(*iocb) - sizeof(void*));
-            iocb->aio_fildes = fd;
-            iocb->aio_lio_opcode = IO_CMD_PWRITE;
-            iocb->aio_reqprio = 0;
-            iocb->u.c.buf = buf;
-            iocb->u.c.nbytes = count;
-            iocb->u.c.offset = offset;
-        }
     };
 
 } // namespace journal




More information about the rhmessaging-commits mailing list