[rhmessaging-commits] rhmessaging commits: r1262 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 7 14:25:50 EST 2007


Author: kpvdr
Date: 2007-11-07 14:25:50 -0500 (Wed, 07 Nov 2007)
New Revision: 1262

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/pmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Bugfix: transient and external header flags were not being handled correctly. Also drastic simplification to JournalSystemTests.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -931,13 +931,13 @@
 				rhm::journal::iores eres;
 				if (txn->getXid().empty()){
 					if (message.isContentReleased()){
-						eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+						eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
 					}else {
 						eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
 					}
 				}else {
 					if (message.isContentReleased()){
-					   eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(), false);
+					   eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
 					} else {
 					    eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
 					}

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -506,7 +506,10 @@
         *datapp = NULL;
         return 0;
     }
-    *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+    if (_enq_hdr.is_external())
+        *datapp = NULL;
+    else
+        *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
     return _enq_hdr._dsize;
 }
 
@@ -539,6 +542,8 @@
 const size_t
 enq_rec::rec_size() const
 {
+    if (_enq_hdr.is_external())
+        return enq_hdr::size() + _enq_hdr._xidsize + rec_tail::size();
     return enq_hdr::size() + _enq_hdr._xidsize + _enq_hdr._dsize + rec_tail::size();
 }
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -176,6 +176,7 @@
         _datafh[i]->reset(&_rcvdat);
     _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+    _rmgr.recover_complete();
     _readonly_flag = false;
 }
 
@@ -440,17 +441,22 @@
                     done = er.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
-                rd._enq_cnt_list[fid]++;
-                if (er.xid_size())
+//std::cout << " E";
+                if (!er.is_transient()) // Ignore transient msgs
                 {
-                    er.get_xid(&xidp);
-                    assert(xidp != NULL);
-                    std::string xid((char*)xidp, er.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
-                    free(xidp);
+                    rd._enq_cnt_list[fid]++;
+                    if (er.xid_size())
+                    {
+                        er.get_xid(&xidp);
+                        assert(xidp != NULL);
+                        std::string xid((char*)xidp, er.xid_size());
+                        _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+                        free(xidp);
+                    }
+                    else
+                        _emap.insert_fid(h._rid, fid);
                 }
-                else
-                    _emap.insert_fid(h._rid, fid);
+//else std::cout << "t";
                 if (rd._h_rid < h._rid)
                     rd._h_rid = h._rid;
             }
@@ -463,6 +469,7 @@
                     done = dr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
+//std::cout << " D";
                 if (dr.xid_size())
                 {
                     // If the enqueue is part of a pending txn, it will not yet be in emap
@@ -503,6 +510,7 @@
                    done = ar.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
+//std::cout << " A";
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
                 assert(xidp != NULL);
@@ -536,6 +544,7 @@
                     done = cr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
+//std::cout << " C";
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
                 assert(xidp != NULL);
@@ -559,11 +568,13 @@
             break;
         case RHM_JDAT_EMPTY_MAGIC:
             {
+//std::cout << " X";
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
             }
         break;
         case 0:
+//std::cout << " 0";
             rd._lfid = fid;
             rd._eo = ifsp->tellg();
             return false;

Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -139,7 +139,6 @@
     _pg_offset_dblks = 0;
     _aio_evt_rem = 0;
     clean();
-//    _emap.clear();
 
     // 1. Allocate page memory (as a single block)
     size_t pagesize = _pages * _pagesize * _sblksize;

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -215,7 +215,7 @@
 rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
         bool& external, data_tok* dtokp) throw (jexception)
 {
-//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << std::flush;
+//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << " po=" << _pg_offset_dblks << " ems=" << _emap.size() << std::flush;
 
     iores res = pre_read_check(dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -465,6 +465,14 @@
 }
 
 void
+rmgr::recover_complete()
+{
+    _pg_index = 0;
+    _pg_cntr = 0;
+    _pg_offset_dblks = 0;
+}
+
+void
 rmgr::initialize() throw (jexception)
 {
     pmgr::initialize();
@@ -554,7 +562,10 @@
     {
         enq_hdr ehdr;
         ::memcpy(&ehdr, rptr, sizeof(enq_hdr));
-        dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
+        if (ehdr.is_external())
+            dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+        else
+            dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
     }
     else if (h._magic == RHM_JDAT_DEQ_MAGIC)
     {

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -74,6 +74,7 @@
         const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
                 bool& transient, bool& external, data_tok* dtokp) throw (jexception);
         const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
+        void recover_complete();
 
     private:
         void initialize() throw (jexception);

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -96,6 +96,7 @@
         data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool transient,
         const bool external) throw (jexception)
 {
+//std::cout << "wmgr::enqueue() dl=" << tot_data_len << " xl=" << xid_len << " t=" << (transient?"T":"F") << " e=" << (external?"T":"F") << " msg=" << (data_buff?std::string((const char*)data_buff, tot_data_len):"<null>") << std::endl;
     if (xid_len)
         assert(xid_ptr != NULL);
 

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -52,13 +52,6 @@
 
 wrfc::~wrfc() {}
 
-// void
-// wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index, u_int64_t rid) throw (jexception)
-// {
-//     rrfc::initialize(nfiles, fh_arr, fh_index);
-//     _rid = rid;
-//     _reset_ok = false;
-// }
 void
 wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp) throw (jexception)
 {

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-07 19:25:50 UTC (rev 1262)
@@ -29,10 +29,9 @@
 * The GNU Lesser General Public License is available in the file COPYING.
 */
 
-#include "../test_plugin.h"
-#include "msg_producer.hpp"
-#include "msg_consumer.hpp"
-#include "jtest.hpp"
+#include "JournalSystemTests.hpp"
+//#include "msg_producer.hpp"
+//#include "msg_consumer.hpp"
 #include <vector>
 
 #define NUM_MSGS 5
@@ -41,1419 +40,784 @@
 #define MSG_SIZE 100
 #define XID_SIZE 64
 
-class JournalSystemTests : public CppUnit::TestCase  
+using namespace std;
+
+void
+JournalSystemTests::InstantiationTest()
 {
-    CPPUNIT_TEST_SUITE(JournalSystemTests);
-    CPPUNIT_TEST(InstantiationTest);
-    CPPUNIT_TEST(InitializationTest);
-    CPPUNIT_TEST(EmptyRecoverTest);
-    CPPUNIT_TEST(EnqueueTest);
-    CPPUNIT_TEST(TxnEnqueueTest);
-    CPPUNIT_TEST(RecoverReadTest);
-    CPPUNIT_TEST(TxnRecoverReadTest);
-    CPPUNIT_TEST(RecoveredReadTest);
-    CPPUNIT_TEST(TxnRecoveredReadTest);
-    CPPUNIT_TEST(RecoveredDequeueTest);
-    CPPUNIT_TEST(TxnRecoveredDequeueTest);
-    CPPUNIT_TEST(ComplexRecoveryTest1);
-    CPPUNIT_TEST(TxnComplexRecoveryTest1);
-    CPPUNIT_TEST(EncodeTest_000);
-    CPPUNIT_TEST(EncodeTest_001);
-    CPPUNIT_TEST(EncodeTest_002);
-    CPPUNIT_TEST(EncodeTest_003);
-    CPPUNIT_TEST(EncodeTest_004);
-    CPPUNIT_TEST(EncodeTest_005);
-    CPPUNIT_TEST(EncodeTest_006);
-    CPPUNIT_TEST(EncodeTest_007);
-    CPPUNIT_TEST(EncodeTest_008);
-    CPPUNIT_TEST(EncodeTest_009);
-    CPPUNIT_TEST(EncodeTest_010);
-    CPPUNIT_TEST(EncodeTest_011);
-    CPPUNIT_TEST(EncodeTest_012);
-    CPPUNIT_TEST(EncodeTest_013);
-    CPPUNIT_TEST(EncodeTest_014);
-    CPPUNIT_TEST(EncodeTest_015);
-    CPPUNIT_TEST(EncodeTest_016);
-    CPPUNIT_TEST(EncodeTest_017);
-    CPPUNIT_TEST(EncodeTest_018);
-    CPPUNIT_TEST(EncodeTest_019);
-    CPPUNIT_TEST(EncodeTest_020);
-    CPPUNIT_TEST(EncodeTest_021);
-    CPPUNIT_TEST(EncodeTest_022);
-    CPPUNIT_TEST(EncodeTest_023);
-    CPPUNIT_TEST(EncodeTest_024);
-    CPPUNIT_TEST(EncodeTest_025);
-    CPPUNIT_TEST(EncodeTest_026); 
-//     CPPUNIT_TEST(EncodeTest_027); // Until race condition fixed
-//     CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
-    CPPUNIT_TEST_SUITE_END();
+    try
+    {
+        char* test_name = "InstantiationTest";
+        rhm::journal::jcntl jc(test_name, "jdata", test_name);
+    }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    jtest t;
-    std::string msg;
-    std::string xid;
-    void* mbuff;
-    size_t msize;
-    void* xidbuff;
-    size_t xidsize;
-    bool transientFlag;
-    bool externalFlag;
-
-public:
-
-    void InstantiationTest()
+void
+JournalSystemTests::InitializationTest()
+{
+    try
     {
-        //Stack
-        char* test_name = "InstantiationTest_Stack";
-        try
-        {
-            rhm::journal::jcntl jc(test_name, "jdata", test_name);
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "InstantiationTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
-            jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-            CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-            delete jcp;
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
+        char* test_name = "InitializationTest";
+        rhm::journal::jcntl jc(test_name, "jdata", test_name);
+        jc.initialize();
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void InitializationTest()
+void
+JournalSystemTests::EmptyRecoverTest()
+{
+    try
     {
-        //Stack
-        char* test_name = "InitializationTest_Stack";
-        try
+        vector<string> txn_list;
+        char* test_name = "EmptyRecoverTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
             jc.initialize();
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
         }
-        // Heap
-        test_name = "InitializationTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
         {
-            jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-            CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-            jcp->initialize();
-            delete jcp;
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            jc.recover_complete();
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void EmptyRecoverTest()
+void
+JournalSystemTests::EnqueueTest()
+{
+    try
     {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "EmptyRecoverTest_Stack";
-        try
-        {
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-            }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-            }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                jc.recover_complete();
-            }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "EmptyRecoverTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
-            {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                delete jcp;
-                jcp = NULL;
-            }
-            {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                delete jcp;
-                jcp = NULL;
-            }
-            {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                jcp->recover_complete();
-                delete jcp;
-            }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
+        char* test_name = "EnqueueTest";
+        rhm::journal::jcntl jc(test_name, "jdata", test_name);
+        jc.initialize();
+
+        // Non-txn
+        for (int m=0; m<NUM_MSGS; m++)
+            enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+
+        // Txn
+        create_xid(xid, 0, XID_SIZE);
+        for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+            enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+        txn_commit(&jc, xid);
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void EnqueueTest()
+void
+JournalSystemTests::RecoverReadTest()
+{
+    vector<string> txn_list;
+    try
     {
-        //Stack
-        char* test_name = "EnqueueTest_Stack";
-        try
+        // Non-txn
+        char* test_name = "RecoverReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
             jc.initialize();
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "EnqueueTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
-            jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-            CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-            jcp->initialize();
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
             for (int m=0; m<NUM_MSGS; m++)
-                enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-            delete jcp;
+            {
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
+            }
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-    }
 
-    void TxnEnqueueTest()
-    {
-        //Stack
-        char* test_name = "TxnEnqueueTest_Stack";
-        try
+        // Txn
+        test_name = "TxnRecoverReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
             jc.initialize();
-            create_xid(xid, 0, XID_SIZE);
+            create_xid(xid, 1, XID_SIZE);
+            txn_list.push_back(xid);
             for (int m=0; m<NUM_MSGS; m++)
                 enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
             txn_commit(&jc, xid);
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "TxnEnqueueTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
-            jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-            CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-            jcp->initialize();
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
             for (int m=0; m<NUM_MSGS; m++)
-                enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
-            txn_commit(jcp, xid);
-            delete jcp;
+            {
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
+            }
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void RecoverReadTest()
+void
+JournalSystemTests::RecoveredReadTest()
+{
+    vector<string> txn_list;
+    try
     {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "RecoverReadTest_Stack";
-        try
+        // Non-txn
+        char* test_name = "RecoveredReadTest";
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+        }
+        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
         }
-        catch (const rhm::journal::jexception& e)
+
+        // Txn
+        test_name = "TxnRecoveredReadTest";
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            create_xid(xid, 2, XID_SIZE);
+            txn_list.push_back(xid);
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+            txn_commit(&jc, xid);
         }
-        // Heap
-        test_name = "RecoverReadTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    std::string msg((char*)mbuff, msize);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void TxnRecoverReadTest()
+void
+JournalSystemTests::RecoveredDequeueTest()
+{
+    vector<string> txn_list;
+    try
     {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "TxnRecoverReadTest_Stack";
-        try
+        // Non-txn
+        char* test_name = "RecoveredDequeueTest";
         {
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                create_xid(xid, 1, XID_SIZE);
-                txn_list.push_back(xid);
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(&jc, xid);
-            }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-            }
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "TxnRecoverReadTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(jcp, xid);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    std::string msg((char*)mbuff, msize);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-    }
 
-    void RecoveredReadTest()
-    {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "RecoveredReadTest_Stack";
-        try
+        // Txn
+        test_name = "TxnRecoveredDequeueTest";
         {
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
-            }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-            }
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            create_xid(xid, 3, XID_SIZE);
+            txn_list.push_back(xid);
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+            txn_commit(&jc, xid);
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "RecoveredReadTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void TxnRecoveredReadTest()
+void
+JournalSystemTests::HeaderFlagsTest()
+{
+    vector<string> txn_list;
+    try
     {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "TxnRecoveredReadTest_Stack";
-        try
+        // Non-txn
+        char* test_name = "FlagsRecoverdTest";
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            // Transient msgs - should not recover
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
+            // Persistent msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+            // Transient extern msgs - should not recover
+            for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+                enq_extern_msg(&jc, true);
+            // Persistnet extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+                enq_extern_msg(&jc, false);
+        }
+        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            // Recover non-transient msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                create_xid(xid, 2, XID_SIZE);
-                txn_list.push_back(xid);
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(&jc, xid);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            // Recover non-transient extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+                CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+                        mbuff == NULL);
+                cleanMessage();
             }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "TxnRecoveredReadTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            jc.recover_complete();
+            // Read recovered non-transient msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(jcp, xid);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            // Read recovered non-transient extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+                CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+                        mbuff == NULL);
+                cleanMessage();
             }
+            // Dequeue recovered messages
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                deq_msg(&jc, m);
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
+
+        // Txn
+        test_name = "TxnFlagsRecoverdTest";
         {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            create_xid(xid, 4, XID_SIZE);
+            txn_list.push_back(xid);
+            // Transient msgs - should not recover
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, true);
+            // Persistent msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+            // Transient extern msgs - should not recover
+            for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+                enq_extern_txn_msg(&jc, xid, true);
+            // Persistnet extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+                enq_extern_txn_msg(&jc, xid, false);
+            txn_commit(&jc, xid);
         }
-    }
-
-    void RecoveredDequeueTest()
-    {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "RecoveredDequeueTest_Stack";
-        try
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+            // Recover non-transient msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            // Recover non-transient extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(&jc, m);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+                CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+                        mbuff == NULL);
+                cleanMessage();
             }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "RecoveredDequeueTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            jc.recover_complete();
+            // Read recovered non-transient msgs
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            // Read recovered non-transient extern msgs
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(jcp, m);
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+                CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+                CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+                        mbuff == NULL);
+                cleanMessage();
             }
+            // Dequeue recovered messages
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                deq_msg(&jc, m);
+            for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
+    catch (const rhm::journal::jexception& e)
+    {
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
+    }
+}
 
-    void TxnRecoveredDequeueTest()
+void
+JournalSystemTests::ComplexRecoveryTest1()
+{
+    vector<string> txn_list;
+    try
     {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "TxnRecoveredDequeueTest_Stack";
-        try
+        // Non-txn
+        char* test_name = "ComplexRecoveryTest1";
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            
+            // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+            // rids: 0 to NUM_MSGS*2 - 1
+            for (int m=0; m<NUM_MSGS*2; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+            // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(&jc, m);
+            jc.flush();
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                create_xid(xid, 3, XID_SIZE);
-                txn_list.push_back(xid);
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(&jc, xid);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(&jc, m);
-            }
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "TxnRecoveredDequeueTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+
+            // Check that only last n readable (as before)
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
-                txn_commit(jcp, xid);
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            jc.recover_complete();
+
+            // Enqueue another n msgs
+            // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+            for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+            jc.flush();
+            
+            // Check that 2n messages are now readable
+            for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
             {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                for (int m=0; m<NUM_MSGS; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(jcp, m);
-                delete jcp;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+
+            // Dequeue all remaining messages
+            // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+            for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-    }
 
-    void ComplexRecoveryTest1()
-    {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "ComplexRecoveryTest1_Stack";
-        try
+        // Txn
+        test_name = "TxnComplexRecoveryTest1";
         {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            
+            // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+            // rids: 0 to NUM_MSGS - 1
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+            // rids: NUM_MSGS to NUM_MSGS*2 - 1
+            create_xid(xid, 5, XID_SIZE);
+            txn_list.push_back(xid);
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+            // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+            for (int m=0; m<NUM_MSGS; m++)
+                deq_msg(&jc, m);
+            // rid: NUM_MSGS*3
+            txn_commit(&jc, xid);
+            jc.flush();
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                // rids: 0 to NUM_MSGS*2 - 1
-                for (int m=0; m<NUM_MSGS*2; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
-                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(&jc, m);
-                jc.flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
-            {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
-                jc.flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    deq_msg(&jc, m);
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    deq_msg(&jc, m);
-            }
         }
-        catch (const rhm::journal::jexception& e)
         {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "RecoveredDequeueTest_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.recover(txn_list);
+
+            // Check that only last n readable (as before)
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                // rids: 0 to NUM_MSGS*2 - 1
-                for (int m=0; m<NUM_MSGS*2; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(jcp, m);
-                jcp->flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
-                jcp = NULL;
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
-            {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                jcp->flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    deq_msg(jcp, m);
-                for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    deq_msg(jcp, m);
-                delete jcp;
-            }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-    }
+            jc.recover_complete();
 
-    void TxnComplexRecoveryTest1()
-    {
-        std::vector<std::string> txn_list;
-        //Stack
-        char* test_name = "TxnComplexRecoveryTest1_Stack";
-        try
-        {
+            // Enqueue another n msgs
+            // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+            for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+            jc.flush();
+            
+            // Check that 2n messages are now readable
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.initialize();
-                // rids: 0 to NUM_MSGS - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
-                // rids: NUM_MSGS to NUM_MSGS*2 - 1
-                create_xid(xid, 4, XID_SIZE);
-                txn_list.push_back(xid);
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
-                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(&jc, m);
-                // rid: NUM_MSGS*3
-                txn_commit(&jc, xid);
-                jc.flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+            for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
             {
-                rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover(txn_list);
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jc.recover_complete();
-                // rids: NUM_MSGS*3+1 to NUM_MSGS*4
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
-                jc.flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                {
-                    read_msg(&jc);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                // rids: NUM_MSGS*4+1 to NUM_MSGS*6
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    deq_msg(&jc, m);
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                    deq_msg(&jc, m);
+                read_msg(&jc);
+                CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+                        create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                cleanMessage();
             }
+
+            // Dequeue all remaining messages
+            // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+            for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                deq_msg(&jc, m);
+            for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                deq_msg(&jc, m);
         }
-        catch (const rhm::journal::jexception& e)
-        {
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
-        // Heap
-        test_name = "TxnComplexRecoveryTest1_Heap";
-        rhm::journal::jcntl* jcp = NULL;
-        try
-        {
-            {
-                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->initialize();
-                // rids: 0 to NUM_MSGS*2 - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                create_xid(xid, 4, XID_SIZE);
-                // rids: NUM_MSGS to NUM_MSGS*2 - 1
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
-                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
-                for (int m=0; m<NUM_MSGS; m++)
-                    deq_msg(jcp, m);
-                // rid: NUM_MSGS*3
-                txn_commit(jcp, xid);
-                jcp->flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                delete jcp;
-                jcp = NULL;
-            }
-            {
-                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
-                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover(txn_list);
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                jcp->recover_complete();
-                // rids: NUM_MSGS*3+1 to NUM_MSGS*4
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
-                jcp->flush();
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                {
-                    read_msg(jcp);
-                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
-                    cleanMessage();
-                }
-                // rids: NUM_MSGS*4+1 to NUM_MSGS*6
-                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
-                    deq_msg(jcp, m);
-                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
-                    deq_msg(jcp, m);
-                delete jcp;
-            }
-        }
-        catch (const rhm::journal::jexception& e)
-        {
-            if (jcp)
-                delete jcp;
-            std::stringstream ss;
-            ss << e;
-            CPPUNIT_FAIL(ss.str());
-        }
     }
-
-    void EncodeTest_000()
+    catch (const rhm::journal::jexception& e)
     {
-        runEncodeTest(0, 0, 0, false, 0, 0, false, false, 2, "Empty journal");
+        stringstream ss;
+        ss << e;
+        CPPUNIT_FAIL(ss.str());
     }
+}
 
-    void EncodeTest_001()
-    {
-        runEncodeTest(1, 10, 10, false, 0, 0, false, false, 2, "1*(10 bytes)");
-    }
+// === Private helper functions ===
 
-    void EncodeTest_002()
-    {
-        runEncodeTest(1, 10, 10, true, 0, 0, false, false, 2, "1*(10 bytes), auto-deq");
-    }
+void
+JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool transient)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_003()
-    {
-        runEncodeTest(10, 10, 10, false, 0, 0, false, false, 2, "10*(10 bytes)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
+            dtp, transient), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_004()
-    {
-        runEncodeTest(10, 10, 10, true, 0, 0, false, false, 2, "10*(10 bytes), auto-deq");
-    }
+void
+JournalSystemTests::enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_005()
-    {
-        runEncodeTest(10, 92, 92, false, 0, 0, false, false, 2, "10*(1 dblk exact fit)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
+            dtp, transient), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_006()
-    {
-        runEncodeTest(10, 92, 92, true, 0, 0, false, false, 2, "10*(1 dblk exact fit), auto-deq");
-    }
+void
+JournalSystemTests::enq_txn_msg(rhm::journal::jcntl* jc, const string msg,
+        const string xid, const bool transient)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_007()
-    {
-        runEncodeTest(10, 93, 93, false, 0, 0, false, false, 2, "10*(1 dblk + 1 byte)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(),
+            msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_008()
-    {
-        runEncodeTest(10, 93, 93, true, 0, 0, false, false, 2, "10*(1 dblk + 1 byte), auto-deq");
-    }
+void
+JournalSystemTests::enq_extern_txn_msg(rhm::journal::jcntl* jc, const string xid,
+        const bool transient)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_009()
-    {
-        runEncodeTest(10, 476, 476, false, 0, 0, false, false, 2, "10*(1 sblk exact fit)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(), dtp, xid,
+            transient), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_010()
-    {
-        runEncodeTest(10, 476, 476, true, 0, 0, false, false, 2, "10*(1 sblk exact fit), auto-deq");
-    }
+void
+JournalSystemTests::deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+    dtp->set_wstate(rhm::journal::data_tok::ENQ);
+    dtp->set_rid(rid);
 
-    void EncodeTest_011()
-    {
-        runEncodeTest(10, 477, 477, false, 0, 0, false, false, 2, "10*(1 sblk + 1 byte)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_012()
-    {
-        runEncodeTest(10, 477, 477, true, 0, 0, false, false, 2, "10*(1 sblk + 1 byte), auto-deq");
-    }
+void
+JournalSystemTests::deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const string xid)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+    dtp->set_wstate(rhm::journal::data_tok::ENQ);
+    dtp->set_rid(rid);
 
-    void EncodeTest_013()
-    {
-        runEncodeTest(8, 4060, 4060, false, 0, 0, false, false, 2, "8*(1/8 page)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
+            jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_014()
-    {
-        runEncodeTest(9, 4060, 4060, false, 0, 0, false, false, 2, "9*(1/8 page)");
-    }
+void
+JournalSystemTests::txn_abort(rhm::journal::jcntl* jc, const string xid)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_015()
-    {
-        runEncodeTest(8, 4061, 4061, false, 0, 0, false, false, 2, "8*(1/8 page + 1 byte)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_016()
-    {
-        runEncodeTest(8, 3932, 3932, true, 0, 0, false, false, 2,
-                "8*(1/8 page - 1 dblk for deq record), auto-deq");
-    }
+void
+JournalSystemTests::txn_commit(rhm::journal::jcntl* jc, const string xid)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
 
-    void EncodeTest_017()
-    {
-        runEncodeTest(9, 3932, 3932, true, 0, 0, false, false, 2,
-                "9*(1/8 page - 1 dblk for deq record), auto-deq");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
 
-    void EncodeTest_018()
-    {
-        runEncodeTest(8, 3933, 3933, true, 0, 0, false, false, 2,
-                "8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
-    }
+char*
+JournalSystemTests::read_msg(rhm::journal::jcntl* jc)
+{
+    rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+    CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+    dtp->set_wstate(rhm::journal::data_tok::ENQ);
 
-    void EncodeTest_019()
-    {
-        runEncodeTest(32, 32732, 32732, false, 0, 0, false, false, 2, "32*(1 page exact fit)");
-    }
+    unsigned aio_sleep_cnt = 0;
+    while (handle_jcntl_response(jc->read_data_record(&mbuff, msize, &xidbuff, xidsize,
+            transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
+    return (char*)mbuff;
+}
 
-    void EncodeTest_020()
+bool
+JournalSystemTests::handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
+        unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
+{
+    switch (res)
     {
-        runEncodeTest(33, 32732, 32732, false, 0, 0, false, false, 2, "33*(1 page exact fit)");
+        case rhm::journal::RHM_IORES_SUCCESS:
+            //((char*)mbuff)[msize] = '\0';
+            return false;
+        case rhm::journal::RHM_IORES_AIO_WAIT:
+            if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+            {
+                jc->get_wr_events();
+                usleep(AIO_SLEEP_TIME);
+            }
+            else
+            {
+                delete dtp;
+                CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
+            }
+            break;
+        case rhm::journal::RHM_IORES_EMPTY:
+            delete dtp;
+            CPPUNIT_FAIL("RHM_IORES_EMPTY");
+        case rhm::journal::RHM_IORES_FULL:
+            delete dtp;
+            CPPUNIT_FAIL("RHM_IORES_FULL");
+        case rhm::journal::RHM_IORES_BUSY:
+            delete dtp;
+            CPPUNIT_FAIL("RHM_IORES_BUSY");
+        case rhm::journal::RHM_IORES_TXPENDING:
+            delete dtp;
+            CPPUNIT_FAIL("RHM_IORES_TXPENDING");
+        default:
+            delete dtp;
+            CPPUNIT_FAIL("unknown return value");
     }
+    return true;
+}
 
-    void EncodeTest_021()
-    {
-        runEncodeTest(22, 49116, 49116, false, 0, 0, false, false, 2, "22*(1.5 pages)");
-    }
+// static fn
+string&
+JournalSystemTests::create_msg(string& s, int msg_num, int len)
+{
+    stringstream ss;
+    ss << "Message_" << setfill('0') << setw(4) << msg_num << "_";
+    for (int i=14; i<=len; i++)
+        ss << (char)('0' + i%10);
+    s.assign(ss.str());
+    return s;
+}
 
-    void EncodeTest_022()
-    {
-        runEncodeTest(22, 48988, 48988, true, 0, 0, false, false, 2,
-                "22*(1.5 pages - 1 dblk for deq record), auto-deq");
-    }
+// static fn
+string&
+JournalSystemTests::create_xid(string& s, int msg_num, int len)
+{
+    stringstream ss;
+    ss << "XID_" << setfill('0') << setw(4) << msg_num << "_";
+    for (int i=9; i<len; i++)
+        ss << (char)('a' + i%26);
+    s.assign(ss.str());
+    return s;
+}
 
-    void EncodeTest_023()
+void JournalSystemTests::cleanMessage()
+{
+    if (xidbuff)
     {
-        runEncodeTest(48, 32732, 32732, false, 0, 0, false, false, 2, "48*(1 page exact fit)");
+        free(xidbuff);
+        xidbuff = NULL;
+        mbuff = NULL;
     }
-
-    void EncodeTest_024()
+    else if (mbuff)
     {
-        runEncodeTest(49, 32732, 32732, false, 0, 0, false, false, 2, "49*(1 page exact fit)");
+        free (mbuff);
+        mbuff = NULL;
     }
-
-    void EncodeTest_025()
-    {
-        runEncodeTest(20, 81884, 81884, false, 0, 0, false, false, 2, "20*(2.5 pages)");
-    }
-
-    void EncodeTest_026()
-    {
-        runEncodeTest(20, 81756, 81756, true, 0, 0, false, false, 2,
-                "20*(2.5 pages - 1 dblk for deq record), auto-deq");
-    }
-
-    void EncodeTest_027()
-    {
-        runEncodeTest(16, 786268, 786268, true, 0, 0, false, false, 2,
-                "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
-    }
-
-    void EncodeTest_028()
-    {
-        runEncodeTest(17, 786268, 786268, true, 0, 0, false, false, 2,
-                "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
-    }
-
-private:
-
-    void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
-                dtp, transient), jc, aio_sleep_cnt, dtp));
-    }
-
-    void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
-                dtp, transient), jc, aio_sleep_cnt, dtp));
-    }
-
-    void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,
-            const bool transient)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(),
-                msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
-    }
-
-    void enq_extern_txn_msg(rhm::journal::jcntl* jc, const std::string xid, const bool transient)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(), dtp, xid,
-                transient), jc, aio_sleep_cnt, dtp));
-    }
-
-    void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-        dtp->set_wstate(rhm::journal::data_tok::ENQ);
-        dtp->set_rid(rid);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
-    }
-
-    void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-        dtp->set_wstate(rhm::journal::data_tok::ENQ);
-        dtp->set_rid(rid);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
-                jc, aio_sleep_cnt, dtp));
-    }
-    
-    void txn_abort(rhm::journal::jcntl* jc, const std::string xid)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
-    }
-    
-    void txn_commit(rhm::journal::jcntl* jc, const std::string xid)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
-    }
-
-    char* read_msg(rhm::journal::jcntl* jc)
-    {
-        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
-        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-        dtp->set_wstate(rhm::journal::data_tok::ENQ);
-
-        unsigned aio_sleep_cnt = 0;
-        while (handle_jcntl_response(jc->read_data_record(&mbuff, msize, &xidbuff, xidsize,
-                transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
-        return (char*)mbuff;
-    }
-    
-    bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
-            unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
-    {
-        switch (res)
-        {
-            case rhm::journal::RHM_IORES_SUCCESS:
-                //((char*)mbuff)[msize] = '\0';
-                return false;
-            case rhm::journal::RHM_IORES_AIO_WAIT:
-                if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
-                {
-                    jc->get_wr_events();
-                    usleep(AIO_SLEEP_TIME);
-                }
-                else
-                {
-                    delete dtp;
-                    CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
-                }
-                break;
-            case rhm::journal::RHM_IORES_EMPTY:
-                delete dtp;
-                CPPUNIT_FAIL("RHM_IORES_EMPTY");
-            case rhm::journal::RHM_IORES_FULL:
-                delete dtp;
-                CPPUNIT_FAIL("RHM_IORES_FULL");
-            case rhm::journal::RHM_IORES_BUSY:
-                delete dtp;
-                CPPUNIT_FAIL("RHM_IORES_BUSY");
-            case rhm::journal::RHM_IORES_TXPENDING:
-                delete dtp;
-                CPPUNIT_FAIL("RHM_IORES_TXPENDING");
-            default:
-                delete dtp;
-                CPPUNIT_FAIL("unknown return value");
-        }
-        return true;
-    }
-    
-    static std::string& create_msg(std::string& s, int msg_num, int len)
-    {
-        std::stringstream ss;
-        ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num << "_";
-        for (int i=14; i<=len; i++)
-            ss << (char)('0' + i%10);
-        s.assign(ss.str());
-        return s;
-    }
-    
-    static std::string& create_xid(std::string& s, int msg_num, int len)
-    {
-        std::stringstream ss;
-        ss << "XID_" << std::setfill('0') << std::setw(4) << msg_num << "_";
-        for (int i=9; i<len; i++)
-            ss << (char)('a' + i%26);
-        s.assign(ss.str());
-        return s;
-    }
-
-    void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
-            const unsigned max_msg_szie, const bool auto_deq, const unsigned min_xid_size,
-            const unsigned max_xid_size, const bool transient, const bool external,
-            const unsigned iterations, char* test_descr)
-    {
-        std::cout << "  [" << test_descr << "] " << std::flush;
-        jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, min_xid_size,
-                max_xid_size, transient, external, test_descr);
-        for (unsigned i=0; i<iterations; i++)
-        {
-            std::cout << "." << std::flush;
-            try
-            {
-                t.initialize(ta);
-                t.run();
-            }
-            catch (rhm::journal::jexception e)
-            {
-                t.finalize();
-                CPPUNIT_FAIL(e.to_string());
-            }
-            t.finalize();
-        }
-    }
-    
-    void cleanMessage()
-    {
-        if (xidbuff)
-        {
-            free(xidbuff);
-            xidbuff = NULL;
-            mbuff = NULL;
-        }
-        else if (mbuff)
-        {
-            free (mbuff);
-            mbuff = NULL;
-        }
-    }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(JournalSystemTests);
+}




More information about the rhmessaging-commits mailing list