Author: kpvdr
Date: 2007-10-01 10:46:01 -0400 (Mon, 01 Oct 2007)
New Revision: 957
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Added PreparedTransactionList to jcntl::recover() fns
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -304,7 +304,7 @@
try
{
- jQueue->recover(); // start recovery
+ jQueue->recover(prepared); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -95,9 +95,7 @@
}
_datafh = ::new lfh*[JRNL_NUM_FILES];
- // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
- // can be thrown during pointer initialization, and the clean() fn that will be
- // called after an exception will attempt to free any non-null pointer.
+ // NULL the pointer array first because new() can throw exceptions
::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
{
@@ -123,20 +121,20 @@
void
jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl,
- const aio_cb wr_cb) throw (jexception)
+ const aio_cb wr_cb, boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
+ throw (jexception)
{
+ // Create list of prepared xids
+ std::set<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+ i != prep_tx_list.end(); i++);
+// prep_xid_list.insert(i->??);
+
// Verify journal dir and journal files
_jdir.verify_dir();
_rcvdat.reset();
_emap.clear();
rcvr_janalyze(_rcvdat);
-//std::cout << "em=" << _emap.size() << "; rd="
<< rd._ffid << ":0x" << std::hex << rd._fro <<
std::dec << "-" << rd._lfid << ":0x" <<
std::hex << rd._eo << std::dec << " ";
-//std::vector<u_int64_t> kv;
-//std::vector<u_int64_t>::iterator ki;
-//_emap.rid_list(kv);
-//std::cout << " rids=[";
-//for (ki=kv.begin(); ki<kv.end(); ki++) {if (ki!=kv.begin()) std::cout <<
", "; std::cout << *ki;}
-//std::cout << "]" << std::flush;
if (_datafh)
{
@@ -147,9 +145,7 @@
}
_datafh = ::new lfh*[JRNL_NUM_FILES];
- // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
- // can be thrown during pointer initialization, and the clean() fn that will be
- // called after an exception will attempt to free any non-null pointer.
+ // NULL the pointer array first because new() can throw exceptions
::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
{
@@ -162,7 +158,6 @@
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
-// _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._lfid, _rcvdat._h_rid +
1);
_wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rd_dtokl, rd_cb);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -49,6 +49,8 @@
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
#include <qpid/broker/PersistableQueue.h>
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
namespace rhm
{
@@ -221,7 +223,8 @@
* \exception TODO
*/
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
- std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw
(jexception);
/**
* \brief Recover using internal default callbacks and data_tok lists.
@@ -230,10 +233,11 @@
*
* \exception TODO
*/
- void recover() throw (jexception)
+ void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
+ throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
- &aio_wr_callback );
+ &aio_wr_callback, prep_tx_list);
}
/**
@@ -379,8 +383,6 @@
/**
* \brief Discard (skip) next record to be read without reading or retrieving it.
*
- * \param dtokp Pointer to data token which contains the details of the enqueue
operation.
- *
* \exception TODO
*/
const iores discard_data_record() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -32,6 +32,7 @@
#ifndef rhm_journal_rcvdat_hpp
#define rhm_journal_rcvdat_hpp
+#include <map>
#include <vector>
namespace rhm
@@ -41,6 +42,10 @@
struct rcvdat
{
+ typedef std::vector<u_int64_t> rid_list;
+ typedef std::pair<rid_list, rid_list> enq_deq_rid_list;
+ typedef std::map<std::string, enq_deq_rid_list> enq_deq_map;
+
bool _empty; ///< Journal data files empty
u_int16_t _ffid; ///< First file id
size_t _fro; ///< First record offset in ffid
@@ -48,6 +53,8 @@
size_t _eo; ///< End offset (first byte past last record)
u_int64_t _h_rid; ///< Highest rid found
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records
found for each file
+ enq_deq_map _edm; ///< Map of enqueue and dequeue rids for each xid
+
rcvdat():
_empty(true),
_ffid(0),
@@ -55,7 +62,8 @@
_lfid(0),
_eo(0),
_h_rid(0),
- _enq_cnt_list(JRNL_NUM_FILES)
+ _enq_cnt_list(JRNL_NUM_FILES),
+ _edm()
{}
void reset()
{
@@ -66,6 +74,7 @@
_eo=0;
_h_rid=0;
_enq_cnt_list.clear();
+ _edm.clear();
}
};
}
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -34,6 +34,9 @@
#include "msg_consumer.hpp"
#include "jtest.hpp"
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
+
#define MAX_MSG_SIZE 127
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
@@ -155,6 +158,7 @@
void EmptyRecoverTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "EmptyRecoverTest_Stack";
try
@@ -165,11 +169,11 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
jc.recover_complete();
}
}
@@ -194,14 +198,14 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
delete jcp;
jcp = NULL;
}
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
jcp->recover_complete();
delete jcp;
}
@@ -257,6 +261,7 @@
void RecoverReadTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoverReadTest_Stack";
try
@@ -269,7 +274,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -298,7 +303,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -317,6 +322,7 @@
void RecoveredReadTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoveredReadTest_Stack";
try
@@ -329,7 +335,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -362,7 +368,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -385,6 +391,7 @@
void RecoveredDequeueTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoveredDequeueTest_Stack";
try
@@ -397,7 +404,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -432,7 +439,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -457,6 +464,7 @@
void ComplexRecoveryTest1()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "ComplexRecoveryTest1_Stack";
try
@@ -477,7 +485,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -530,7 +538,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);