Author: kpvdr
Date: 2007-11-21 09:07:18 -0500 (Wed, 21 Nov 2007)
New Revision: 1349
Modified:
store/trunk/cpp/docs/
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/tests/jrnl/
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Fix to correctly set the messageId after a resore based on the highest RID found in the
journal.
Property changes on: store/trunk/cpp/docs
___________________________________________________________________
Name: svn:ignore
- Makefile.in
+ Makefile.in
Makefile
html
man
latex
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -384,8 +384,10 @@
try
{
- jQueue->recover(prepared, key.id); // start recovery
+ u_int64_t highestRid = 0;
+ jQueue->recover(prepared, highestRid, key.id); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
+ messageIdSequence.reset(highestRid + 1);
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName +
": recoverQueues() failed: " + e.what());
@@ -463,7 +465,6 @@
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
- u_int64_t maxMessageId(1);
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
@@ -519,10 +520,6 @@
queue->recover(msg);
}
- if (dtokp.rid() > maxMessageId) {
- maxMessageId = dtokp.rid();
- }
-
dtokp.reset();
dtokp.set_wstate(DataTokenImpl::ENQ);
@@ -549,7 +546,6 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
": recoverMessages() failed: " + e.what());
}
- messageIdSequence.reset(maxMessageId + 1);
}
RecoverableMessage::shared_ptr
BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
@@ -939,6 +935,7 @@
bool written = false;
unsigned aio_sleep_cnt = 0;
+ unsigned busy_sleep_cnt = 0;
while (!written)
{
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -962,13 +959,19 @@
if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
written = true;
aio_sleep_cnt = 0;
+ busy_sleep_cnt = 0;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO:
RHM_IORES_AIO_WAIT");
usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get
events
jc->get_wr_events();
break;
+ case rhm::journal::RHM_IORES_BUSY:
+ if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex:
RHM_IORES_BUSY");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call
as option
+ break;
case rhm::journal::RHM_IORES_FULL:
// Temporary error msg till exception handler core problem solved...
std::cerr << "Error storing message -- Journal full on queue \""
<< queue->getName() << "\"" << std::endl <<
std::flush;
@@ -1095,13 +1098,13 @@
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO:
RHM_IORES_AIO_WAIT");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as
option
jc->get_wr_events();
break;
case rhm::journal::RHM_IORES_BUSY:
if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex:
RHM_IORES_BUSY");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as
option
break;
default:
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -71,8 +71,8 @@
void
JournalImpl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t
queue_id)
- throw (jexception)
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
u_int64_t& highest_rid,
+ u_int64_t queue_id) throw (jexception)
{
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -81,7 +81,7 @@
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+ jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-21 14:07:18 UTC (rev 1349)
@@ -92,13 +92,13 @@
void recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
- u_int64_t queue_id) throw (journal::jexception);
+ u_int64_t& highest_rid, u_int64_t queue_id) throw
(journal::jexception);
void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
- u_int64_t queue_id) throw (journal::jexception)
+ u_int64_t& highest_rid, u_int64_t queue_id) throw
(journal::jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_tx_list, queue_id);
+ &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
}
// Temporary fn to read and save last msg read from journal so it can be
assigned
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -127,7 +127,8 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
std::deque<data_tok*>* wdtoklp,
- const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list)
throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
u_int64_t& highest_rid)
+ throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
@@ -135,6 +136,9 @@
_emap.clear();
_tmap.clear();
rcvr_janalyze(_rcvdat, prep_txn_list);
+ highest_rid = _rcvdat._h_rid;
+ if (_rcvdat._full)
+ throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover_complete");
// Debug info, but may be useful to print with a flag
//_rcvdat.print();
@@ -174,7 +178,7 @@
jcntl::recover_complete() throw (jexception)
{
if (!_readonly_flag)
- throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recovered");
+ throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recover_complete");
for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
@@ -408,6 +412,12 @@
u_int16_t fid = rd._ffid;
std::ifstream ifs;
while (rcvr_get_next_record(fid, &ifs, rd));
+
+ // Check for journal full condition
+ u_int16_t next_wr_fid = (rd._lfid + 1) % JRNL_NUM_FILES;
+ if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
+ rd._full = true;
+
std::vector<std::string> xid_list;
_tmap.xid_list(xid_list);
for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end();
@@ -492,8 +502,9 @@
{
try
{
- _emap.get_remove_fid(dr.deq_rid());
- rd._enq_cnt_list[fid]--;
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+//std::cout << enq_fid;
+ rd._enq_cnt_list[enq_fid]--;
}
catch (const jexception& e)
{
@@ -533,7 +544,7 @@
throw e;
}
if (itr->_enq_flag)
- rd._enq_cnt_list[fid]--;
+ rd._enq_cnt_list[itr->_fid]--;
}
free(xidp);
if (rd._h_rid < h._rid)
@@ -579,7 +590,7 @@
}
break;
case 0:
-//std::cout << " 0";
+//std::cout << " 0 ";
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
@@ -668,8 +679,8 @@
;
}
}
+ intrusive_ptr_release(dtokp);
this_dtok_list.pop_front();
- intrusive_ptr_release(dtokp);
}
}
@@ -691,8 +702,8 @@
// cct call the recovery manager. / lazyload..
}
}
+ intrusive_ptr_release( dtokp);
this_dtok_list.pop_front();
- intrusive_ptr_release( dtokp);
}
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -221,12 +221,13 @@
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be
NULL.
* \param prep_txn_list
+ * \param highest_rid Returns the highest rid found in the journal during recover
*
* \exception TODO
*/
void recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list) throw (jexception);
+ const std::vector<std::string>& prep_txn_list, u_int64_t&
highest_rid) throw (jexception);
/**
* \brief Recover using internal default callbacks and data_tok lists.
@@ -235,10 +236,11 @@
*
* \exception TODO
*/
- void recover(const std::vector<std::string>& prep_txn_list) throw
(jexception)
+ void recover(const std::vector<std::string>& prep_txn_list,
u_int64_t& highest_rid)
+ throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_txn_list);
+ &aio_wr_callback, prep_txn_list, highest_rid);
}
/**
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -48,6 +48,7 @@
u_int16_t _lfid; ///< Last file id
size_t _eo; ///< End offset (first byte past last record)
u_int64_t _h_rid; ///< Highest rid found
+ bool _full; ///< Journal is full
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records
found for each file
rcvdat():
@@ -57,6 +58,7 @@
_lfid(0),
_eo(0),
_h_rid(0),
+ _full(false),
_enq_cnt_list(JRNL_NUM_FILES, 0)
{}
@@ -68,6 +70,7 @@
_lfid=0;
_eo=0;
_h_rid=0;
+ _full = false;
for (unsigned f=0; f<_enq_cnt_list.size(); f++)
_enq_cnt_list[f] = 0;
}
@@ -84,6 +87,7 @@
std::cout << " End offset (_eo) = 0x" << std::hex
<< _eo << std::dec << " (" <<
(_eo/JRNL_DBLK_SIZE) << " dblks)" <<
std::endl;
std::cout << " Highest rid (_h_rid) = " << _h_rid
<< std::endl;
+ std::cout << " Journal full (_full) = " << (_full
? "TRUE" : "FALSE") << std::endl;
std::cout << " Enqueued records (txn & non-txn):"
<< std::endl;
for (unsigned i=0; i<_enq_cnt_list.size(); i++)
std::cout << " File " << i << ":
" << _enq_cnt_list[i] << std::endl;
Property changes on: store/trunk/cpp/tests/jrnl
___________________________________________________________________
Name: svn:ignore
- .deps
.libs
Makefile
Makefile.in
jtest
unit_test_jerrno
unit_test_jexception
unit_test_jinf
+ .deps
.libs
Makefile
Makefile.in
jtest
unit_test_enq_map
unit_test_jerrno
unit_test_jexception
unit_test_jdir
unit_test_file_hdr
unit_test_jinf
unit_test_txn_map
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-21 14:07:18 UTC (rev 1349)
@@ -81,6 +81,7 @@
try
{
vector<string> txn_list;
+ u_int64_t highest_rid;
char* test_name = "EmptyRecoverTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -88,11 +89,11 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
jc.recover_complete();
}
}
@@ -135,6 +136,7 @@
JournalSystemTests::RecoverReadTest()
{
vector<string> txn_list;
+ u_int64_t highest_rid;
try
{
// Non-txn
@@ -147,7 +149,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -170,7 +172,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -192,6 +194,7 @@
JournalSystemTests::RecoveredReadTest()
{
vector<string> txn_list;
+ u_int64_t highest_rid;
try
{
// Non-txn
@@ -204,7 +207,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -235,7 +238,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -265,6 +268,7 @@
JournalSystemTests::RecoveredDequeueTest()
{
vector<string> txn_list;
+ u_int64_t highest_rid;
try
{
// Non-txn
@@ -277,7 +281,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -310,7 +314,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -342,6 +346,7 @@
JournalSystemTests::HeaderFlagsTest()
{
vector<string> txn_list;
+ u_int64_t highest_rid;
try
{
// Non-txn
@@ -364,7 +369,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -436,7 +441,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -497,6 +502,7 @@
JournalSystemTests::ComplexRecoveryTest1()
{
vector<string> txn_list;
+ u_int64_t highest_rid;
try
{
// Non-txn
@@ -523,7 +529,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -587,7 +593,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
+ jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)