rhmessaging commits: r944 - in store/trunk/cpp: tests/jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-25 09:00:25 -0400 (Tue, 25 Sep 2007)
New Revision: 944
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Further updates to new DTX API in jcntl
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-24 20:16:06 UTC (rev 943)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-25 13:00:25 UTC (rev 944)
@@ -202,39 +202,52 @@
const iores
jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
- const size_t /*this_data_len*/, const void* const /*xid_buff*/, const size_t /*xid_len*/,
- data_tok* /*dtok*/) throw (jexception)
+ const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string /*xid*/,
+ const bool /*transient*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
const iores
-get_next_data_record(data_tok* /*dtok*/) throw (jexception)
+jcntl::get_next_data_record(const void** const /*data*/, const size_t& /*dsize*/,
+ const size_t& /*dsize_avail*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
const iores
-discard_next_data_record(data_tok* /*dtok*/) throw (jexception)
+jcntl::discard_next_data_record(data_tok* /*dtokp*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
const iores
-jcntl::read_next_data_record(void* const dbuf, const size_t dbsize, data_tok* const dtok)
+jcntl::read_next_data_record(void* const dbuf, const size_t dbsize, data_tok* const dtokp)
throw (jexception)
{
check_rstatus("read_data");
- return _rmgr.read(dbuf, dbsize, dtok);
+ return _rmgr.read(dbuf, dbsize, dtokp);
}
const iores
-jcntl::dequeue_data_record(data_tok* const dtok) throw (jexception)
+jcntl::dequeue_data_record(data_tok* const dtokp, const std::string /*xid*/) throw (jexception)
{
check_wstatus("dequeue_data");
- return _wmgr.dequeue(dtok);
+ return _wmgr.dequeue(dtokp);
}
+const iores
+jcntl::abort_dtx(const std::string /*xid*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+jcntl::commit_dtx(const std::string /*xid*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
const u_int32_t
jcntl::get_wr_events() throw (jexception)
{
@@ -516,6 +529,8 @@
}
+// Static instance of empty string used as default XID parameter and which signifies no XID.
+const std::string jcntl::no_xid;
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-24 20:16:06 UTC (rev 943)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-25 13:00:25 UTC (rev 944)
@@ -137,6 +137,7 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
+ static const std::string no_xid;
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
@@ -179,7 +180,7 @@
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be NULL.
*
- * \exception ??
+ * \exception TODO
*/
void initialize(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
@@ -189,7 +190,7 @@
*
* TODO: Move to JournalImpl later
*
- * \exception ??
+ * \exception TODO
*/
void initialize() throw (jexception)
{
@@ -218,7 +219,7 @@
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be NULL.
*
- * \exception ??
+ * \exception TODO
*/
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
@@ -228,7 +229,7 @@
*
* TODO: Move to JournalImpl later
*
- * \exception ??
+ * \exception TODO
*/
void recover() throw (jexception)
{
@@ -245,7 +246,7 @@
* may be re-read during normal operation. The read-only flag is then reset, allowing
* enqueue and dequeue operations to resume.
*
- * \exception ??
+ * \exception TODO
*/
void recover_complete() throw (jexception);
@@ -254,64 +255,87 @@
*
* Clear the journal directory of all journal files matching the base filename.
*
- * \exception ??
+ * \exception TODO
*/
void delete_jrnl_files() throw (jexception);
- /**
- * \brief Enqueues (writes) data to the journal.
- *
- * \param dbuf Pointer to data to be written to journal.
- * \param dlen Size of data (in bytes) in dbuf to be written.
- * \param dtok Pointer to data_tok instance for this data, used to track state of data
- * through journal.
- *
- * \exception ??
- */
+ // TODO *** DEPRECATED *** Replaced by enqueue_data_record() call below.
const iores enqueue_data_record(const void* const dbuf, const size_t dlen,
data_tok* const dtok) throw (jexception);
/**
- * \brief Enqueue data or part of data
+ * \brief Enqueue data.
*
- * \param data_buff
- * \param tot_data_len
- * \param this_data_len
- * \param xid_buff
- * \param xid_len
- * \param dtok
+ * Enqueue data or part thereof. If a large data block is being written, then it may be
+ * enqueued in parts by setting this_data_len to the size of the data being written in this
+ * call. The total data size must be known in advance, however, as this is written into the
+ * record header on the first record write. The state of the write (i.e. how much has been
+ * written so far) is maintained in the data token dtokp. Partial writes will return in state
+ * ENQ_PART.
*
- * \exception ??
+ * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write
+ * operation did not complete successfully or was partially completed. The action taken under
+ * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT
+ * implies that all pages in the write page cache are waiting for AIO operations to return,
+ * and that the call should be remade after waiting a bit.
+ *
+ * Example: If a write of 99 kB is divided into three equal parts, then the following states
+ * and returns would characterize a successful operation:
+ * dtok. dtok. dtok.
+ * operation return wstate() dsize() written() comment
+ * NONE 0 0 Value of dtok before op
+ * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1
+ * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed
+ * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again
+ * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3
+ *
+ * \param data_buff Pointer to data to be enqueued for this enqueue operation.
+ * \param tot_data_len Total data length.
+ * \param this_data_len Amount to be written in this enqueue operation.
+ * \param dtokp Pointer to data token which contains the details of the enqueue operation.
+ * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+ * non-transactional.
+ * \param transient Flag indicating transient persistence (ie, ignored on recover).
+ *
+ * \exception TODO
*/
- const iores enqueue_data_record(
- const void* const data_buff, ///< pointer to data to be enqueued in this enq op
- const size_t tot_data_len, ///< total data length
- const size_t this_data_len, ///< amount to be written in this enq op
- const void* const xid_buff, ///< pointer to xid
- const size_t xid_len, ///< xid length
- data_tok* dtok) ///< pointer to data token instance
- throw (jexception);
+ const iores enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string xid = no_xid,
+ const bool transient = false) throw (jexception);
/**
* \brief Retrieve details of next record to be read without consuming the record.
*
* Retrieve details of next record to be read without consuming the record. A pointer to
- * the data is also returned.
+ * the data is returned, along with the data size and available data size. If a large record
+ * should span more than one page of the read page cache, then if the following page is still
+ * waiting for AIO return, then only a portion of the total data will be available for
+ * consumption. If dsize_avail < dsize, then a subsequent call will update dsize_avail if
+ * more pages have returned from AIO.
*
- * \param dtok
+ * Note that using this function does not consume the record, it merely makes the content
+ * of the read page cache available through a pointer. To mark this record as consumed,
+ * discard_next_data_record() must be called.
*
- * \exception ??
+ * \param data Pointer to data pointer which will point to the first byte of the next record
+ * data.
+ * \param dsize Reference that returns the total data size of the record data .
+ * \param dsize_avail Reference that returns the amount of the data that is available for
+ * consumption.
+ *
+ * \exception TODO
*/
- const iores get_next_data_record(data_tok* dtok) throw (jexception);
+ const iores get_next_data_record(const void** const data, const size_t& dsize,
+ const size_t& dsize_avail) throw (jexception);
/**
* \brief Discard (skip) next record to be read without reading or retrieving it.
*
- * \param dtok
+ * \param dtokp Pointer to data token which contains the details of the enqueue operation.
*
- * \exception ??
+ * \exception TODO
*/
- const iores discard_next_data_record(data_tok* dtok) throw (jexception);
+ const iores discard_next_data_record(data_tok* dtokp) throw (jexception);
/**
* \brief Reads data from the journal.
@@ -319,13 +343,13 @@
* \param dbuf Pointer to buffer into which data is to be read.
* \param dbsize Buffer capacity in bytes - i.e. Maximum size of data to be written into
* dbuf.
- * \param dtok Pointer to data_tok instance for this data, used to track state of data
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
* through journal.
*
- * \exception ??
+ * \exception TODO
*/
const iores read_next_data_record(void* const dbuf, const size_t dbsize,
- data_tok* const dtok) throw (jexception);
+ data_tok* const dtokp) throw (jexception);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -336,12 +360,15 @@
* record to be dequeued and the write state of ENQ must be manually set in a new or reset
* instance of data_tok.
*
- * \param dtok Pointer to data_tok instance for this data, used to track state of data
+ * \param dtokp Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ * \param xid String containing xid. An empty string (i.e. length=0) will be considered
+ * non-transactional.
*
- * \exception ??
+ * \exception TODO
*/
- const iores dequeue_data_record(data_tok* const dtok) throw (jexception);
+ const iores dequeue_data_record(data_tok* const dtokp, const std::string xid = no_xid)
+ throw (jexception);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -350,12 +377,11 @@
* are effectively deleted from the journal, and can not be read. All dequeued records remain
* as though they had never been dequeued.
*
- * \param xid_buff
- * \param xid_len
+ * \param xid String containing xid.
*
- * \exception ??
+ * \exception TODO
*/
- const iores abort_xid(const void* const xid_buff, const size_t xid_len) throw (jexception);
+ const iores abort_dtx(const std::string xid) throw (jexception);
/**
* \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
@@ -364,22 +390,20 @@
* records are effectively released for reading and dequeueing. All dequeued records are
* removed and can no longer be accessed.
*
- * \param xid_buff
- * \param xid_len
+ * \param xid String containing xid.
*
- * \exception ??
+ * \exception TODO
*/
- const iores commit_xid(const void* const xid_buff, const size_t xid_len) throw (jexception);
+ const iores commit_dtx(const std::string xid) throw (jexception);
/**
* \brief Check whether all the enqueue records for the given xid have reached disk.
*
- * \param xid_buff
- * \param xid_len
+ * \param xid String containing xid.
*
- * \exception ??
+ * \exception TODO
*/
- const bool is_synced(const void* const xid_buff, const size_t xid_len) throw (jexception);
+ const bool is_dtx_synced(const std::string xid) throw (jexception);
/**
* \brief Forces a check for returned AIO write events.
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 20:16:06 UTC (rev 943)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-25 13:00:25 UTC (rev 944)
@@ -42,22 +42,14 @@
class JournalSystemTests : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(JournalSystemTests);
- CPPUNIT_TEST(InstantiationTest_Stack);
- CPPUNIT_TEST(InstantiationTest_Heap);
- CPPUNIT_TEST(InitializationTest_Stack);
- CPPUNIT_TEST(InitializationTest_Heap);
- CPPUNIT_TEST(EmptyRecoverTest_Stack);
- CPPUNIT_TEST(EmptyRecoverTest_Heap);
- CPPUNIT_TEST(EnqueueTest_Stack);
- CPPUNIT_TEST(EnqueueTest_Heap);
- CPPUNIT_TEST(RecoverReadTest_Stack);
- CPPUNIT_TEST(RecoverReadTest_Heap);
- CPPUNIT_TEST(RecoveredReadTest_Stack);
- CPPUNIT_TEST(RecoveredReadTest_Heap);
- CPPUNIT_TEST(RecoveredDequeueTest_Stack);
- CPPUNIT_TEST(RecoveredDequeueTest_Heap);
- CPPUNIT_TEST(ComplexRecoveryTest1_Stack);
- CPPUNIT_TEST(ComplexRecoveryTest1_Heap);
+ CPPUNIT_TEST(InstantiationTest);
+ CPPUNIT_TEST(InitializationTest);
+ CPPUNIT_TEST(EmptyRecoverTest);
+ CPPUNIT_TEST(EnqueueTest);
+ CPPUNIT_TEST(RecoverReadTest);
+ CPPUNIT_TEST(RecoveredReadTest);
+ CPPUNIT_TEST(RecoveredDequeueTest);
+ CPPUNIT_TEST(ComplexRecoveryTest1);
CPPUNIT_TEST(EncodeTest_000);
CPPUNIT_TEST(EncodeTest_001);
CPPUNIT_TEST(EncodeTest_002);
@@ -95,9 +87,10 @@
public:
- void InstantiationTest_Stack()
+ void InstantiationTest()
{
- const char* test_name = "InstantiationTest_Stack";
+ //Stack
+ char* test_name = "InstantiationTest_Stack";
try
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -108,11 +101,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void InstantiationTest_Heap()
- {
- const char* test_name = "InstantiationTest_Heap";
+ // Heap
+ test_name = "InstantiationTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -128,9 +118,10 @@
}
}
- void InitializationTest_Stack()
+ void InitializationTest()
{
- const char* test_name = "InitializationTest_Stack";
+ //Stack
+ char* test_name = "InitializationTest_Stack";
try
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -142,11 +133,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void InitializationTest_Heap()
- {
- const char* test_name = "InitializationTest_Heap";
+ // Heap
+ test_name = "InitializationTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -165,9 +153,10 @@
}
}
- void EmptyRecoverTest_Stack()
+ void EmptyRecoverTest()
{
- const char* test_name = "EmptyRecoverTest_Stack";
+ //Stack
+ char* test_name = "EmptyRecoverTest_Stack";
try
{
{
@@ -190,11 +179,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void EmptyRecoverTest_Heap()
- {
- const char* test_name = "EmptyRecoverTest_Heap";
+ // Heap
+ test_name = "EmptyRecoverTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -230,9 +216,10 @@
}
}
- void EnqueueTest_Stack()
+ void EnqueueTest()
{
- const char* test_name = "EnqueueTest_Stack";
+ //Stack
+ char* test_name = "EnqueueTest_Stack";
try
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -246,11 +233,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void EnqueueTest_Heap()
- {
- const char* test_name = "EnqueueTest_Heap";
+ // Heap
+ test_name = "EnqueueTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -271,9 +255,10 @@
}
}
- void RecoverReadTest_Stack()
+ void RecoverReadTest()
{
- const char* test_name = "RecoverReadTest_Stack";
+ //Stack
+ char* test_name = "RecoverReadTest_Stack";
try
{
{
@@ -296,11 +281,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void RecoverReadTest_Heap()
- {
- const char* test_name = "RecoverReadTest_Heap";
+ // Heap
+ test_name = "RecoverReadTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -333,9 +315,10 @@
}
}
- void RecoveredReadTest_Stack()
+ void RecoveredReadTest()
{
- const char* test_name = "RecoveredReadTest_Stack";
+ //Stack
+ char* test_name = "RecoveredReadTest_Stack";
try
{
{
@@ -362,11 +345,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void RecoveredReadTest_Heap()
- {
- const char* test_name = "RecoveredReadTest_Heap";
+ // Heap
+ test_name = "RecoveredReadTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -403,9 +383,10 @@
}
}
- void RecoveredDequeueTest_Stack()
+ void RecoveredDequeueTest()
{
- const char* test_name = "RecoveredDequeueTest_Stack";
+ //Stack
+ char* test_name = "RecoveredDequeueTest_Stack";
try
{
{
@@ -434,11 +415,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void RecoveredDequeueTest_Heap()
- {
- const char* test_name = "RecoveredDequeueTest_Heap";
+ // Heap
+ test_name = "RecoveredDequeueTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
@@ -477,9 +455,10 @@
}
}
- void ComplexRecoveryTest1_Stack()
+ void ComplexRecoveryTest1()
{
- const char* test_name = "ComplexRecoveryTest1_Stack";
+ //Stack
+ char* test_name = "ComplexRecoveryTest1_Stack";
try
{
{
@@ -526,11 +505,8 @@
ss << e;
CPPUNIT_FAIL(ss.str());
}
- }
-
- void ComplexRecoveryTest1_Heap()
- {
- const char* test_name = "RecoveredDequeueTest_Heap";
+ // Heap
+ test_name = "RecoveredDequeueTest_Heap";
rhm::journal::jcntl* jcp = NULL;
try
{
16 years, 9 months
rhmessaging commits: r943 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-24 16:16:06 -0400 (Mon, 24 Sep 2007)
New Revision: 943
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
Log:
Added new jcntl provisional DTX interface
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -164,7 +164,7 @@
if (eqs)
{
journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
- jQueue->delete_jrnl();
+ jQueue->delete_jrnl_files();
queue.setExternalQueueStore(NULL); // will delete the journal if exists
}
@@ -314,7 +314,7 @@
{
jQueue->recover(); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recovered(); // start journal.
+ jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
@@ -415,7 +415,7 @@
std:: cout << "loop -- uses fixed size -> FIX <-" << std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
- rhm::journal::iores res = jc->read_data(&buff, buffSize, &dtokp);
+ rhm::journal::iores res = jc->read_next_data_record(&buff, buffSize, &dtokp);
readSize = dtokp.dsize();
assert(readSize < buffSize); /// fail safe for hack...
@@ -798,7 +798,7 @@
while (!written)
{
journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
+ rhm::journal::iores eres = jc->enqueue_data_record(buff, size, dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -904,7 +904,7 @@
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_data(ddtokp);
+ dres = jc->dequeue_data_record(ddtokp);
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -173,7 +173,7 @@
}
void
-jcntl::recovered() throw (jexception)
+jcntl::recover_complete() throw (jexception)
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recovered");
@@ -185,7 +185,7 @@
}
void
-jcntl::delete_jrnl() throw (jexception)
+jcntl::delete_jrnl_files() throw (jexception)
{
stop(true); // wait for AIO to complete
_jdir.delete_dir();
@@ -193,7 +193,7 @@
const iores
-jcntl::enqueue_data(const void* const dbuf, const size_t dlen, data_tok* const dtok)
+jcntl::enqueue_data_record(const void* const dbuf, const size_t dlen, data_tok* const dtok)
throw (jexception)
{
check_wstatus("enqueue_data");
@@ -201,14 +201,35 @@
}
const iores
-jcntl::read_data(void* const dbuf, const size_t dbsize, data_tok* const dtok) throw (jexception)
+jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
+ const size_t /*this_data_len*/, const void* const /*xid_buff*/, const size_t /*xid_len*/,
+ data_tok* /*dtok*/) throw (jexception)
{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+get_next_data_record(data_tok* /*dtok*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+discard_next_data_record(data_tok* /*dtok*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+jcntl::read_next_data_record(void* const dbuf, const size_t dbsize, data_tok* const dtok)
+ throw (jexception)
+{
check_rstatus("read_data");
return _rmgr.read(dbuf, dbsize, dtok);
}
const iores
-jcntl::dequeue_data(data_tok* const dtok) throw (jexception)
+jcntl::dequeue_data_record(data_tok* const dtok) throw (jexception)
{
check_wstatus("dequeue_data");
return _wmgr.dequeue(dtok);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -56,7 +56,6 @@
{
/**
- * \class jcntl
* \brief Access and control interface for the journal. This is the top-level class for the
* journal.
*
@@ -70,7 +69,6 @@
{
private:
/**
- * \var _jid
* \brief Journal ID
*
* This string uniquly identifies this journal instance. It will most likely be associated
@@ -80,7 +78,6 @@
std::string _jid;
/**
- * \var _jdir
* \brief Journal directory
*
* This string stores the path to the journal directory. It may be absolute or relative, and
@@ -90,7 +87,6 @@
jdir _jdir;
/**
- * \var _base_filename
* \brief Base filename
*
* This string contains the base filename used for the journal files. The filenames will
@@ -101,7 +97,6 @@
std::string _base_filename;
/**
- * \var _init_flag
* \brief Initialized flag
*
* This flag starts out set to false, is set to true once this object has been initilaized,
@@ -110,7 +105,6 @@
bool _init_flag;
/**
- * \var _stop_flag
* \brief Stopped flag
*
* This flag starts out false, and is set to true when stop() is called. At this point, the
@@ -119,8 +113,20 @@
*/
bool _stop_flag;
+ /**
+ * \brief Read-only state flag used during recover.
+ *
+ * When true, this flag prevents journal write operations (enqueue and dequeue), but
+ * allows read to occur. It is used durning recovery, and is reset when recovered() is
+ * called.
+ */
bool _readonly_flag;
+ /**
+ * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water
+ * marker. If not set, then attempts to write will throw exceptions until the journal
+ * file low water marker moves to the next journal file.
+ */
bool _autostop; ///< Autostop flag - stops journal when overrun occurs
// Journal control structures
@@ -161,23 +167,29 @@
* <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
* the data from an existing journal, use recover().
*
- * <b>NOTE: if NULL is passed to the deque's they are created internally and deleted
- * intenally </b>
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+ * and deleted.</b>
*
- * <b>NOTE: if NULL is passed to the callbacks internal ones will be used. </b>
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+ * used.</b>
*
* \param rd_dtokl deque for storing data tokens retruning from read AIO operations.
* \param rd_cb Function pointer to callback function for read operations. May be NULL.
* \param wr_dtokl deque for storing data tokens retruning from enqueue and dequeue (write)
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be NULL.
+ *
+ * \exception ??
*/
void initialize(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
/**
- * Initialize using internal default callbacks and data_tok lists.
+ * \brief Initialize using internal default callbacks and data_tok lists.
+ *
* TODO: Move to JournalImpl later
+ *
+ * \exception ??
*/
void initialize() throw (jexception)
{
@@ -185,29 +197,67 @@
&aio_wr_callback );
}
+ /**
+ * /brief Initialize journal by recovering state from previously written journal.
+ *
+ * Initialize journal by recovering state from previously written journal. The journal files
+ * are analyzed, and all records that have not been dequeued and that remain in the jouranl
+ * will be available for reading. The journal is placed in a read-only state until
+ * recovered() is called; any calls to enqueue or dequeue will fail with an exception
+ * in this state.
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
+ * and deleted.</b>
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
+ * used.</b>
+ *
+ * \param rd_dtokl deque for storing data tokens retruning from read AIO operations.
+ * \param rd_cb Function pointer to callback function for read operations. May be NULL.
+ * \param wr_dtokl deque for storing data tokens retruning from enqueue and dequeue (write)
+ * AIO operations.
+ * \param wr_cb Function pointer to callback function for write operations. May be NULL.
+ *
+ * \exception ??
+ */
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ /**
+ * \brief Recover using internal default callbacks and data_tok lists.
+ *
+ * TODO: Move to JournalImpl later
+ *
+ * \exception ??
+ */
void recover() throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback );
}
-
- void recovered() throw (jexception);
+ /**
+ * \brief Notification to the journal that recovery is complete and that normal operation
+ * may resume.
+ *
+ * This call notifies the journal that recovery is complete and that normal operation
+ * may resume. The read pointers are reset so that all records read as a part of recover
+ * may be re-read during normal operation. The read-only flag is then reset, allowing
+ * enqueue and dequeue operations to resume.
+ *
+ * \exception ??
+ */
+ void recover_complete() throw (jexception);
/**
- * \brief Delete the journal directory of files matching the base filename
- * by moving them into a subdirectory
- *
- * Stops AIO, stop journal, deletes files.....
+ * \brief Stops journal and deletes all journal files.
*
- * \exception The directory handle could not be closed.
+ * Clear the journal directory of all journal files matching the base filename.
+ *
+ * \exception ??
*/
- void delete_jrnl() throw (jexception);
+ void delete_jrnl_files() throw (jexception);
-
/**
* \brief Enqueues (writes) data to the journal.
*
@@ -215,11 +265,55 @@
* \param dlen Size of data (in bytes) in dbuf to be written.
* \param dtok Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ *
+ * \exception ??
*/
- const iores enqueue_data(const void* const dbuf, const size_t dlen, data_tok* const dtok)
+ const iores enqueue_data_record(const void* const dbuf, const size_t dlen,
+ data_tok* const dtok) throw (jexception);
+
+ /**
+ * \brief Enqueue data or part of data
+ *
+ * \param data_buff
+ * \param tot_data_len
+ * \param this_data_len
+ * \param xid_buff
+ * \param xid_len
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores enqueue_data_record(
+ const void* const data_buff, ///< pointer to data to be enqueued in this enq op
+ const size_t tot_data_len, ///< total data length
+ const size_t this_data_len, ///< amount to be written in this enq op
+ const void* const xid_buff, ///< pointer to xid
+ const size_t xid_len, ///< xid length
+ data_tok* dtok) ///< pointer to data token instance
throw (jexception);
/**
+ * \brief Retrieve details of next record to be read without consuming the record.
+ *
+ * Retrieve details of next record to be read without consuming the record. A pointer to
+ * the data is also returned.
+ *
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores get_next_data_record(data_tok* dtok) throw (jexception);
+
+ /**
+ * \brief Discard (skip) next record to be read without reading or retrieving it.
+ *
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores discard_next_data_record(data_tok* dtok) throw (jexception);
+
+ /**
* \brief Reads data from the journal.
*
* \param dbuf Pointer to buffer into which data is to be read.
@@ -227,26 +321,67 @@
* dbuf.
* \param dtok Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ *
+ * \exception ??
*/
- const iores read_data(void* const dbuf, const size_t dbsize, data_tok* const dtok)
- throw (jexception);
-
-// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
-// throw (jexception);
+ const iores read_next_data_record(void* const dbuf, const size_t dbsize,
+ data_tok* const dtok) throw (jexception);
/**
- * \brief Dequeues (marks as no longer needed) data from journal.
+ * \brief Dequeues (marks as no longer needed) data record in journal.
*
- * Dequeues (marks as no longer needed) data from journal. Note that it is imperative to use
- * the same data token instance used to enqueue this data; it contains the record ID needed
- * to correctly mark this data as dequeued in the journal.
+ * Dequeues (marks as no longer needed) data record in journal. Note that it is possible
+ * to use the same data token instance used to enqueue this data; it contains the record ID
+ * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the
+ * record to be dequeued and the write state of ENQ must be manually set in a new or reset
+ * instance of data_tok.
*
- * \param dtok Pointer to data_tok instance for this data, used to track state of data
+ * \param dtok Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ *
+ * \exception ??
*/
- const iores dequeue_data(data_tok* const dtok) throw (jexception);
+ const iores dequeue_data_record(data_tok* const dtok) throw (jexception);
/**
+ * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
+ *
+ * Abort the transaction for all records enqueued with the matching xid. All enqueued records
+ * are effectively deleted from the journal, and can not be read. All dequeued records remain
+ * as though they had never been dequeued.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const iores abort_xid(const void* const xid_buff, const size_t xid_len) throw (jexception);
+
+ /**
+ * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
+ *
+ * Commit the transaction for all records enqueued with the matching xid. All enqueued
+ * records are effectively released for reading and dequeueing. All dequeued records are
+ * removed and can no longer be accessed.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const iores commit_xid(const void* const xid_buff, const size_t xid_len) throw (jexception);
+
+ /**
+ * \brief Check whether all the enqueue records for the given xid have reached disk.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const bool is_synced(const void* const xid_buff, const size_t xid_len) throw (jexception);
+
+ /**
* \brief Forces a check for returned AIO write events.
*
* Forces a check for returned AIO write events. This is normally performed by enqueue() and
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -64,7 +64,9 @@
RHM_IORES_AIO_WAIT, ///< IO operation suspended as all pages in cache are waiting for AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
RHM_IORES_FULL, ///< During write operations, the journal files are full.
- RHM_IORES_BUSY ///< Another blocking operation is in progress.
+ RHM_IORES_BUSY, ///< Another blocking operation is in progress.
+ RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction.
+ RHM_IORES_NOTIMPL ///< Not yet implemented.
};
typedef _iores iores;
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -1,5 +1,5 @@
/**
-* \file JournalTest.cpp
+* \file JournalSystemTests.cpp
*
* Red Hat Messaging - Message Journal
*
@@ -181,7 +181,7 @@
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.recover();
- jc.recovered();
+ jc.recover_complete();
}
}
catch (rhm::journal::jexception& e)
@@ -216,7 +216,7 @@
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
jcp->recover();
- jcp->recovered();
+ jcp->recover_complete();
delete jcp;
}
}
@@ -350,7 +350,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -386,7 +386,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -420,7 +420,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -458,7 +458,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -502,7 +502,7 @@
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
enq_msg(&jc, create_msg(msg, m));
@@ -558,7 +558,7 @@
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
enq_msg(jcp, create_msg(msg, m));
@@ -745,7 +745,7 @@
CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data(msg.c_str(), msg.size(), dtp), jc,
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), dtp), jc,
aio_sleep_cnt, dtp));
}
@@ -757,7 +757,7 @@
dtp->set_rid(rid);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_data(dtp), jc, aio_sleep_cnt, dtp));
+ while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
}
char* read_msg(rhm::journal::jcntl* jc)
@@ -770,8 +770,8 @@
dtp->set_wstate(rhm::journal::data_tok::ENQ);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_data(buff, MAX_MSG_SIZE, dtp), jc, aio_sleep_cnt,
- dtp));
+ while (handle_jcntl_response(jc->read_next_data_record(buff, MAX_MSG_SIZE, dtp), jc,
+ aio_sleep_cnt, dtp));
buff[dtp->dsize()] = '\0';
return buff;
}
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-24 20:16:06 UTC (rev 943)
@@ -40,6 +40,7 @@
SHELL = /bin/bash
+QPID_HOME_DIR = $(HOME)/qpid
RHM_JRNL_SRC_DIR = ../../lib/jrnl
RHM_JRNL_DOC_DIR = ../../docs
@@ -73,7 +74,7 @@
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers -pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
-LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/redhat/qpid/cpp/src/.libs
+LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L$(QPID_HOME_DIR)/cpp/src/.libs
.SUFFIXES:
.SUFFIXES: .cpp .o
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -123,7 +123,8 @@
{
if (dtokp->wstate() >= rhm::journal::data_tok::ENQ)
{
- rhm::journal::iores res = _jcntl.read_data((void* const)_msg_buff, buffSize, dtokp);
+ rhm::journal::iores res = _jcntl.read_next_data_record((void* const)_msg_buff,
+ buffSize, dtokp);
rhm::journal::data_tok::read_state rs = dtokp->rstate();
rhm::journal::data_tok::write_state ws = dtokp->wstate();
switch (res)
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -142,7 +142,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores eres = jc.enqueue_data(msg, size, dtokp);
+ rhm::journal::iores eres = jc.enqueue_data_record(msg, size, dtokp);
rhm::journal::data_tok::write_state ws = dtokp->wstate();
const char* wsstr = dtokp->wstate_str();
switch (eres)
@@ -293,7 +293,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores dres = _jcptr->dequeue_data(ddtokp);
+ rhm::journal::iores dres = _jcptr->dequeue_data_record(ddtokp);
const char* wsstr = ddtokp->wstate_str();
switch (dres)
{
16 years, 9 months
rhmessaging commits: r942 - in store/trunk/cpp: tests/jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-24 14:04:55 -0400 (Mon, 24 Sep 2007)
New Revision: 942
Added:
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/JournalUnitTests.cpp
Removed:
store/trunk/cpp/tests/jrnl/JournalTest.cpp
Modified:
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/Makefile.am
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Created framework for journal unit tests, added (trivial) test for jexception class.
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-23 17:37:01 UTC (rev 941)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-24 18:04:55 UTC (rev 942)
@@ -67,15 +67,9 @@
* \param nfiles Number of files in the rotating file group.
* \param fh_arr Pointer to an array of file handles (nlogging_fh or subclasses),
* each of which correspond to one of the physical files.
- * \param fh_index Initial index of journal file. Default = 0.
+ * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
+ * NULL.
*/
-// void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0,
-// #ifdef DRHM_TESTVALS
-// u_int64_t rid = u_int64_t(0xffeeddcc) << 32
-// #else
-// u_int64_t rid = 0
-// #endif
-// ) throw (jexception);
void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL) throw (jexception);
/**
Copied: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp (from rev 941, store/trunk/cpp/tests/jrnl/JournalTest.cpp)
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 18:04:55 UTC (rev 942)
@@ -0,0 +1,852 @@
+/**
+* \file JournalTest.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains the unit tests for the journal.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../test_plugin.h"
+#include "msg_producer.hpp"
+#include "msg_consumer.hpp"
+#include "jtest.hpp"
+
+#define MAX_MSG_SIZE 127
+#define NUM_MSGS 5
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000
+
+class JournalSystemTests : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(JournalSystemTests);
+ CPPUNIT_TEST(InstantiationTest_Stack);
+ CPPUNIT_TEST(InstantiationTest_Heap);
+ CPPUNIT_TEST(InitializationTest_Stack);
+ CPPUNIT_TEST(InitializationTest_Heap);
+ CPPUNIT_TEST(EmptyRecoverTest_Stack);
+ CPPUNIT_TEST(EmptyRecoverTest_Heap);
+ CPPUNIT_TEST(EnqueueTest_Stack);
+ CPPUNIT_TEST(EnqueueTest_Heap);
+ CPPUNIT_TEST(RecoverReadTest_Stack);
+ CPPUNIT_TEST(RecoverReadTest_Heap);
+ CPPUNIT_TEST(RecoveredReadTest_Stack);
+ CPPUNIT_TEST(RecoveredReadTest_Heap);
+ CPPUNIT_TEST(RecoveredDequeueTest_Stack);
+ CPPUNIT_TEST(RecoveredDequeueTest_Heap);
+ CPPUNIT_TEST(ComplexRecoveryTest1_Stack);
+ CPPUNIT_TEST(ComplexRecoveryTest1_Heap);
+ CPPUNIT_TEST(EncodeTest_000);
+ CPPUNIT_TEST(EncodeTest_001);
+ CPPUNIT_TEST(EncodeTest_002);
+ CPPUNIT_TEST(EncodeTest_003);
+ CPPUNIT_TEST(EncodeTest_004);
+ CPPUNIT_TEST(EncodeTest_005);
+ CPPUNIT_TEST(EncodeTest_006);
+ CPPUNIT_TEST(EncodeTest_007);
+ CPPUNIT_TEST(EncodeTest_008);
+ CPPUNIT_TEST(EncodeTest_009);
+ CPPUNIT_TEST(EncodeTest_010);
+ CPPUNIT_TEST(EncodeTest_011);
+ CPPUNIT_TEST(EncodeTest_012);
+ CPPUNIT_TEST(EncodeTest_013);
+ CPPUNIT_TEST(EncodeTest_014);
+ CPPUNIT_TEST(EncodeTest_015);
+ CPPUNIT_TEST(EncodeTest_016);
+ CPPUNIT_TEST(EncodeTest_017);
+ CPPUNIT_TEST(EncodeTest_018);
+ CPPUNIT_TEST(EncodeTest_019);
+ CPPUNIT_TEST(EncodeTest_020);
+ CPPUNIT_TEST(EncodeTest_021);
+ CPPUNIT_TEST(EncodeTest_022);
+ CPPUNIT_TEST(EncodeTest_023);
+ CPPUNIT_TEST(EncodeTest_024);
+ CPPUNIT_TEST(EncodeTest_025);
+ CPPUNIT_TEST(EncodeTest_026);
+ CPPUNIT_TEST(EncodeTest_027);
+// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
+ CPPUNIT_TEST_SUITE_END();
+
+ jtest t;
+ std::string msg;
+ char buff[MAX_MSG_SIZE + 1];
+
+public:
+
+ void InstantiationTest_Stack()
+ {
+ const char* test_name = "InstantiationTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InstantiationTest_Heap()
+ {
+ const char* test_name = "InstantiationTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InitializationTest_Stack()
+ {
+ const char* test_name = "InitializationTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InitializationTest_Heap()
+ {
+ const char* test_name = "InitializationTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EmptyRecoverTest_Stack()
+ {
+ const char* test_name = "EmptyRecoverTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ jc.recovered();
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EmptyRecoverTest_Heap()
+ {
+ const char* test_name = "EmptyRecoverTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ jcp->recovered();
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EnqueueTest_Stack()
+ {
+ const char* test_name = "EnqueueTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EnqueueTest_Heap()
+ {
+ const char* test_name = "EnqueueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoverReadTest_Stack()
+ {
+ const char* test_name = "RecoverReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoverReadTest_Heap()
+ {
+ const char* test_name = "RecoverReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoveredReadTest_Stack()
+ {
+ const char* test_name = "RecoveredReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoveredReadTest_Heap()
+ {
+ const char* test_name = "RecoveredReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoveredDequeueTest_Stack()
+ {
+ const char* test_name = "RecoveredDequeueTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void RecoveredDequeueTest_Heap()
+ {
+ const char* test_name = "RecoveredDequeueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void ComplexRecoveryTest1_Stack()
+ {
+ const char* test_name = "ComplexRecoveryTest1_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void ComplexRecoveryTest1_Heap()
+ {
+ const char* test_name = "RecoveredDequeueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(jcp, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EncodeTest_000()
+ {
+ runEncodeTest(0, 0, 0, false, 2, "Empty journal");
+ }
+
+ void EncodeTest_001()
+ {
+ runEncodeTest(1, 10, 10, false, 2, "1*(10 bytes)");
+ }
+
+ void EncodeTest_002()
+ {
+ runEncodeTest(1, 10, 10, true, 2, "1*(10 bytes), auto-deq");
+ }
+
+ void EncodeTest_003()
+ {
+ runEncodeTest(10, 10, 10, false, 2, "10*(10 bytes)");
+ }
+
+ void EncodeTest_004()
+ {
+ runEncodeTest(10, 10, 10, true, 2, "10*(10 bytes), auto-deq");
+ }
+
+ void EncodeTest_005()
+ {
+ runEncodeTest(10, 92, 92, false, 2, "10*(1 dblk exact fit)");
+ }
+
+ void EncodeTest_006()
+ {
+ runEncodeTest(10, 92, 92, true, 2, "10*(1 dblk exact fit), auto-deq");
+ }
+
+ void EncodeTest_007()
+ {
+ runEncodeTest(10, 93, 93, false, 2, "10*(1 dblk + 1 byte)");
+ }
+
+ void EncodeTest_008()
+ {
+ runEncodeTest(10, 93, 93, true, 2, "10*(1 dblk + 1 byte), auto-deq");
+ }
+
+ void EncodeTest_009()
+ {
+ runEncodeTest(10, 476, 476, false, 2, "10*(1 sblk exact fit)");
+ }
+
+ void EncodeTest_010()
+ {
+ runEncodeTest(10, 476, 476, true, 2, "10*(1 sblk exact fit), auto-deq");
+ }
+
+ void EncodeTest_011()
+ {
+ runEncodeTest(10, 477, 477, false, 2, "10*(1 sblk + 1 byte)");
+ }
+
+ void EncodeTest_012()
+ {
+ runEncodeTest(10, 477, 477, true, 2, "10*(1 sblk + 1 byte), auto-deq");
+ }
+
+ void EncodeTest_013()
+ {
+ runEncodeTest(8, 4060, 4060, false, 2, "8*(1/8 page)");
+ }
+
+ void EncodeTest_014()
+ {
+ runEncodeTest(9, 4060, 4060, false, 2, "9*(1/8 page)");
+ }
+
+ void EncodeTest_015()
+ {
+ runEncodeTest(8, 4061, 4061, false, 2, "8*(1/8 page + 1 byte)");
+ }
+
+ void EncodeTest_016()
+ {
+ runEncodeTest(8, 3932, 3932, true, 2, "8*(1/8 page - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_017()
+ {
+ runEncodeTest(9, 3932, 3932, true, 2, "9*(1/8 page - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_018()
+ {
+ runEncodeTest(8, 3933, 3933, true, 2,
+ "8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
+ }
+
+ void EncodeTest_019()
+ {
+ runEncodeTest(32, 32732, 32732, false, 2, "32*(1 page exact fit)");
+ }
+
+ void EncodeTest_020()
+ {
+ runEncodeTest(33, 32732, 32732, false, 2, "33*(1 page exact fit)");
+ }
+
+ void EncodeTest_021()
+ {
+ runEncodeTest(22, 49116, 49116, false, 2, "22*(1.5 pages)");
+ }
+
+ void EncodeTest_022()
+ {
+ runEncodeTest(22, 48988, 48988, true, 2,
+ "22*(1.5 pages - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_023()
+ {
+ runEncodeTest(48, 32732, 32732, false, 2, "48*(1 page exact fit)");
+ }
+
+ void EncodeTest_024()
+ {
+ runEncodeTest(49, 32732, 32732, false, 2, "49*(1 page exact fit)");
+ }
+
+ void EncodeTest_025()
+ {
+ runEncodeTest(20, 81884, 81884, false, 2, "20*(2.5 pages)");
+ }
+
+ void EncodeTest_026()
+ {
+ runEncodeTest(20, 81756, 81756, true, 2,
+ "20*(2.5 pages - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_027()
+ {
+ runEncodeTest(16, 786268, 786268, true, 2,
+ "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
+ }
+
+ void EncodeTest_028()
+ {
+ runEncodeTest(17, 786268, 786268, true, 2,
+ "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
+ }
+
+private:
+
+ void enq_msg(rhm::journal::jcntl* jc, const std::string msg)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_data(msg.c_str(), msg.size(), dtp), jc,
+ aio_sleep_cnt, dtp));
+ }
+
+ void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_data(dtp), jc, aio_sleep_cnt, dtp));
+ }
+
+ char* read_msg(rhm::journal::jcntl* jc)
+ {
+ memset(buff, '*', MAX_MSG_SIZE);
+ buff[MAX_MSG_SIZE] = '\0';
+
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->read_data(buff, MAX_MSG_SIZE, dtp), jc, aio_sleep_cnt,
+ dtp));
+ buff[dtp->dsize()] = '\0';
+ return buff;
+ }
+
+ bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
+ unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
+ {
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ buff[dtp->dsize()] = '\0';
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ {
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): timeout on RHM_IORES_AIO_WAIT.");
+ }
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_EMPTY.");
+ case rhm::journal::RHM_IORES_FULL:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_FULL.");
+ case rhm::journal::RHM_IORES_BUSY:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_BUSY.");
+ default:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): unknown return value.");
+ }
+ return true;
+ }
+
+ static std::string& create_msg(std::string& s, int msg_num)
+ {
+ std::stringstream ss;
+ ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
+ ss << "_4567890123456789012345678901234567890";
+ ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
+ s.assign(ss.str());
+ return s;
+ }
+
+ void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
+ const unsigned max_msg_szie, const bool auto_deq, const unsigned iterations,
+ const char* test_descr)
+ {
+ std::cout << " [" << test_descr << "] " << std::flush;
+ jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, test_descr);
+ for (unsigned i=0; i<iterations; i++)
+ {
+ std::cout << "." << std::flush;
+ try
+ {
+ t.initialize(ta);
+ t.run();
+ }
+ catch (rhm::journal::jexception e)
+ {
+ t.finalize();
+ std::string s;
+ CPPUNIT_FAIL(e.to_string(s));
+ }
+ t.finalize();
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(JournalSystemTests);
Deleted: store/trunk/cpp/tests/jrnl/JournalTest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-23 17:37:01 UTC (rev 941)
+++ store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-24 18:04:55 UTC (rev 942)
@@ -1,851 +0,0 @@
-/**
-* \file JournalTest.cpp
-*
-* Red Hat Messaging - Message Journal
-*
-* This file contains the unit tests for the journal.
-*
-* \author Kim van der Riet
-*
-* Copyright 2007 Red Hat, Inc.
-*
-* This file is part of Red Hat Messaging.
-*
-* Red Hat Messaging is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License as published by the Free Software Foundation; either
-* version 2.1 of the License, or (at your option) any later version.
-*
-* This library is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this library; if not, write to the Free Software
-* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-* USA
-*
-* The GNU Lesser General Public License is available in the file COPYING.
-*/
-
-#include "../test_plugin.h"
-#include "msg_producer.hpp"
-#include "msg_consumer.hpp"
-#include "jtest.hpp"
-
-#define MAX_MSG_SIZE 127
-#define NUM_MSGS 5
-#define MAX_AIO_SLEEPS 500
-#define AIO_SLEEP_TIME 1000
-
-class JournalTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(JournalTest);
- CPPUNIT_TEST(InstantiationTest_Stack);
- CPPUNIT_TEST(InstantiationTest_Heap);
- CPPUNIT_TEST(InitializationTest_Stack);
- CPPUNIT_TEST(InitializationTest_Heap);
- CPPUNIT_TEST(EmptyRecoverTest_Stack);
- CPPUNIT_TEST(EmptyRecoverTest_Heap);
- CPPUNIT_TEST(EnqueueTest_Stack);
- CPPUNIT_TEST(EnqueueTest_Heap);
- CPPUNIT_TEST(RecoverReadTest_Stack);
- CPPUNIT_TEST(RecoverReadTest_Heap);
- CPPUNIT_TEST(RecoveredReadTest_Stack);
- CPPUNIT_TEST(RecoveredReadTest_Heap);
- CPPUNIT_TEST(RecoveredDequeueTest_Stack);
- CPPUNIT_TEST(RecoveredDequeueTest_Heap);
- CPPUNIT_TEST(ComplexRecoveryTest1_Stack);
- CPPUNIT_TEST(ComplexRecoveryTest1_Heap);
- CPPUNIT_TEST(EncodeTest_000);
- CPPUNIT_TEST(EncodeTest_001);
- CPPUNIT_TEST(EncodeTest_002);
- CPPUNIT_TEST(EncodeTest_003);
- CPPUNIT_TEST(EncodeTest_004);
- CPPUNIT_TEST(EncodeTest_005);
- CPPUNIT_TEST(EncodeTest_006);
- CPPUNIT_TEST(EncodeTest_007);
- CPPUNIT_TEST(EncodeTest_008);
- CPPUNIT_TEST(EncodeTest_009);
- CPPUNIT_TEST(EncodeTest_010);
- CPPUNIT_TEST(EncodeTest_011);
- CPPUNIT_TEST(EncodeTest_012);
- CPPUNIT_TEST(EncodeTest_013);
- CPPUNIT_TEST(EncodeTest_014);
- CPPUNIT_TEST(EncodeTest_015);
- CPPUNIT_TEST(EncodeTest_016);
- CPPUNIT_TEST(EncodeTest_017);
- CPPUNIT_TEST(EncodeTest_018);
- CPPUNIT_TEST(EncodeTest_019);
- CPPUNIT_TEST(EncodeTest_020);
- CPPUNIT_TEST(EncodeTest_021);
- CPPUNIT_TEST(EncodeTest_022);
- CPPUNIT_TEST(EncodeTest_023);
- CPPUNIT_TEST(EncodeTest_024);
- CPPUNIT_TEST(EncodeTest_025);
- CPPUNIT_TEST(EncodeTest_026);
- CPPUNIT_TEST(EncodeTest_027);
-// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
- CPPUNIT_TEST_SUITE_END();
-
- jtest t;
- std::string msg;
- char buff[MAX_MSG_SIZE + 1];
-
-public:
-
- void InstantiationTest_Stack()
- {
- const char* test_name = "InstantiationTest_Stack";
- try
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void InstantiationTest_Heap()
- {
- const char* test_name = "InstantiationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- delete jcp;
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void InitializationTest_Stack()
- {
- const char* test_name = "InitializationTest_Stack";
- try
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void InitializationTest_Heap()
- {
- const char* test_name = "InitializationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- delete jcp;
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void EmptyRecoverTest_Stack()
- {
- const char* test_name = "EmptyRecoverTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- jc.recovered();
- }
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void EmptyRecoverTest_Heap()
- {
- const char* test_name = "EmptyRecoverTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- jcp->recovered();
- delete jcp;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void EnqueueTest_Stack()
- {
- const char* test_name = "EnqueueTest_Stack";
- try
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void EnqueueTest_Heap()
- {
- const char* test_name = "EnqueueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
- delete jcp;
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoverReadTest_Stack()
- {
- const char* test_name = "RecoverReadTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- }
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoverReadTest_Heap()
- {
- const char* test_name = "RecoverReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- delete jcp;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoveredReadTest_Stack()
- {
- const char* test_name = "RecoveredReadTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- }
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoveredReadTest_Heap()
- {
- const char* test_name = "RecoveredReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- delete jcp;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoveredDequeueTest_Stack()
- {
- const char* test_name = "RecoveredDequeueTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- }
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void RecoveredDequeueTest_Heap()
- {
- const char* test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
- for (int m=0; m<NUM_MSGS; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void ComplexRecoveryTest1_Stack()
- {
- const char* test_name = "ComplexRecoveryTest1_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(&jc, create_msg(msg, m));
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(&jc, create_msg(msg, m));
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(&jc)) == 0);
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(&jc, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(&jc, m);
- }
- }
- catch (rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void ComplexRecoveryTest1_Heap()
- {
- const char* test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(jcp, create_msg(msg, m));
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(jcp, create_msg(msg, m));
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(read_msg(jcp)) == 0);
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(jcp, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
-
- void EncodeTest_000()
- {
- runEncodeTest(0, 0, 0, false, 2, "Empty journal");
- }
-
- void EncodeTest_001()
- {
- runEncodeTest(1, 10, 10, false, 2, "1*(10 bytes)");
- }
-
- void EncodeTest_002()
- {
- runEncodeTest(1, 10, 10, true, 2, "1*(10 bytes), auto-deq");
- }
-
- void EncodeTest_003()
- {
- runEncodeTest(10, 10, 10, false, 2, "10*(10 bytes)");
- }
-
- void EncodeTest_004()
- {
- runEncodeTest(10, 10, 10, true, 2, "10*(10 bytes), auto-deq");
- }
-
- void EncodeTest_005()
- {
- runEncodeTest(10, 92, 92, false, 2, "10*(1 dblk exact fit)");
- }
-
- void EncodeTest_006()
- {
- runEncodeTest(10, 92, 92, true, 2, "10*(1 dblk exact fit), auto-deq");
- }
-
- void EncodeTest_007()
- {
- runEncodeTest(10, 93, 93, false, 2, "10*(1 dblk + 1 byte)");
- }
-
- void EncodeTest_008()
- {
- runEncodeTest(10, 93, 93, true, 2, "10*(1 dblk + 1 byte), auto-deq");
- }
-
- void EncodeTest_009()
- {
- runEncodeTest(10, 476, 476, false, 2, "10*(1 sblk exact fit)");
- }
-
- void EncodeTest_010()
- {
- runEncodeTest(10, 476, 476, true, 2, "10*(1 sblk exact fit), auto-deq");
- }
-
- void EncodeTest_011()
- {
- runEncodeTest(10, 477, 477, false, 2, "10*(1 sblk + 1 byte)");
- }
-
- void EncodeTest_012()
- {
- runEncodeTest(10, 477, 477, true, 2, "10*(1 sblk + 1 byte), auto-deq");
- }
-
- void EncodeTest_013()
- {
- runEncodeTest(8, 4060, 4060, false, 2, "8*(1/8 page)");
- }
-
- void EncodeTest_014()
- {
- runEncodeTest(9, 4060, 4060, false, 2, "9*(1/8 page)");
- }
-
- void EncodeTest_015()
- {
- runEncodeTest(8, 4061, 4061, false, 2, "8*(1/8 page + 1 byte)");
- }
-
- void EncodeTest_016()
- {
- runEncodeTest(8, 3932, 3932, true, 2, "8*(1/8 page - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_017()
- {
- runEncodeTest(9, 3932, 3932, true, 2, "9*(1/8 page - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_018()
- {
- runEncodeTest(8, 3933, 3933, true, 2,
- "8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
- }
-
- void EncodeTest_019()
- {
- runEncodeTest(32, 32732, 32732, false, 2, "32*(1 page exact fit)");
- }
-
- void EncodeTest_020()
- {
- runEncodeTest(33, 32732, 32732, false, 2, "33*(1 page exact fit)");
- }
-
- void EncodeTest_021()
- {
- runEncodeTest(22, 49116, 49116, false, 2, "22*(1.5 pages)");
- }
-
- void EncodeTest_022()
- {
- runEncodeTest(22, 48988, 48988, true, 2,
- "22*(1.5 pages - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_023()
- {
- runEncodeTest(48, 32732, 32732, false, 2, "48*(1 page exact fit)");
- }
-
- void EncodeTest_024()
- {
- runEncodeTest(49, 32732, 32732, false, 2, "49*(1 page exact fit)");
- }
-
- void EncodeTest_025()
- {
- runEncodeTest(20, 81884, 81884, false, 2, "20*(2.5 pages)");
- }
-
- void EncodeTest_026()
- {
- runEncodeTest(20, 81756, 81756, true, 2,
- "20*(2.5 pages - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_027()
- {
- runEncodeTest(16, 786268, 786268, true, 2,
- "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
- }
-
- void EncodeTest_028()
- {
- runEncodeTest(17, 786268, 786268, true, 2,
- "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
- }
-
-private:
-
- void enq_msg(rhm::journal::jcntl* jc, const std::string msg)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data(msg.c_str(), msg.size(), dtp), jc,
- aio_sleep_cnt, dtp));
- }
-
- void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(rid);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_data(dtp), jc, aio_sleep_cnt, dtp));
- }
-
- char* read_msg(rhm::journal::jcntl* jc)
- {
- memset(buff, '*', MAX_MSG_SIZE);
- buff[MAX_MSG_SIZE] = '\0';
-
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_data(buff, MAX_MSG_SIZE, dtp), jc, aio_sleep_cnt, dtp));
- buff[dtp->dsize()] = '\0';
- return buff;
- }
-
- bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
- unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
- {
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- buff[dtp->dsize()] = '\0';
- return false;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
- {
- jc->get_wr_events();
- usleep(AIO_SLEEP_TIME);
- }
- else
- {
- delete dtp;
- CPPUNIT_FAIL("dequeue_data(): timeout on RHM_IORES_AIO_WAIT.");
- }
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_EMPTY.");
- case rhm::journal::RHM_IORES_FULL:
- delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_FULL.");
- case rhm::journal::RHM_IORES_BUSY:
- delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_BUSY.");
- default:
- delete dtp;
- CPPUNIT_FAIL("dequeue_data(): unknown return value.");
- }
- return true;
- }
-
- static std::string& create_msg(std::string& s, int msg_num)
- {
- std::stringstream ss;
- ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
- ss << "_4567890123456789012345678901234567890";
- ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
- s.assign(ss.str());
- return s;
- }
-
- void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
- const unsigned max_msg_szie, const bool auto_deq, const unsigned iterations,
- const char* test_descr)
- {
- std::cout << " [" << test_descr << "] " << std::flush;
- jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, test_descr);
- for (unsigned i=0; i<iterations; i++)
- {
- std::cout << "." << std::flush;
- try
- {
- t.initialize(ta);
- t.run();
- }
- catch (rhm::journal::jexception e)
- {
- t.finalize();
- std::string s;
- CPPUNIT_FAIL(e.to_string(s));
- }
- t.finalize();
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(JournalTest);
Added: store/trunk/cpp/tests/jrnl/JournalUnitTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalUnitTests.cpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/JournalUnitTests.cpp 2007-09-24 18:04:55 UTC (rev 942)
@@ -0,0 +1,125 @@
+/**
+* \file JournalUnitTests.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains the unit tests for the journal.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../test_plugin.h"
+
+#include <jrnl/jexception.hpp>
+
+using namespace rhm::journal;
+using namespace std;
+
+class JournalUnitTests : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(JournalUnitTests);
+ CPPUNIT_TEST(JournalExceptionTest);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+
+ void JournalExceptionTest()
+ {
+ jexception e1(1);
+ CPPUNIT_ASSERT_MESSAGE("e1: Error code bad.", e1.err_code() == 1);
+ CPPUNIT_ASSERT_MESSAGE("e1: Additional information bad.", e1.additional_info().empty());
+ CPPUNIT_ASSERT_MESSAGE("e1: Throwing class bad.", e1.throwing_class().empty());
+ CPPUNIT_ASSERT_MESSAGE("e1: Throwing function bad.", e1.throwing_fn().empty());
+
+ jexception e2("exception2");
+ CPPUNIT_ASSERT_MESSAGE("e2: Error code bad.", e2.err_code() == 0);
+ CPPUNIT_ASSERT_MESSAGE("e2: Additional information bad.",
+ e2.additional_info().compare("exception2") == 0);
+ CPPUNIT_ASSERT_MESSAGE("e2: Throwing class bad.", e2.throwing_class().empty());
+ CPPUNIT_ASSERT_MESSAGE("e2: Throwing function bad.", e2.throwing_fn().empty());
+
+ string msg3("exception3");
+ jexception e3(msg3);
+ CPPUNIT_ASSERT_MESSAGE("e3: Error code bad.", e3.err_code() == 0);
+ CPPUNIT_ASSERT_MESSAGE("e3: Additional information bad.",
+ e3.additional_info().compare(msg3) == 0);
+ CPPUNIT_ASSERT_MESSAGE("e3: Throwing class bad.", e3.throwing_class().empty());
+ CPPUNIT_ASSERT_MESSAGE("e3: Throwing function bad.", e3.throwing_fn().empty());
+
+ jexception e4(4, "exception4");
+ CPPUNIT_ASSERT_MESSAGE("e4: Error code bad.", e4.err_code() == 4);
+ CPPUNIT_ASSERT_MESSAGE("e4: Additional information bad.",
+ e4.additional_info().compare("exception4") == 0);
+ CPPUNIT_ASSERT_MESSAGE("e4: Throwing class bad.", e4.throwing_class().empty());
+ CPPUNIT_ASSERT_MESSAGE("e4: Throwing function bad.", e4.throwing_fn().empty());
+
+ string msg5("exception5");
+ jexception e5(5, msg5);
+ CPPUNIT_ASSERT_MESSAGE("e5: Error code bad.", e5.err_code() == 5);
+ CPPUNIT_ASSERT_MESSAGE("e5: Additional information bad.",
+ e5.additional_info().compare(msg5) == 0);
+ CPPUNIT_ASSERT_MESSAGE("e5: Throwing class bad.", e5.throwing_class().empty());
+ CPPUNIT_ASSERT_MESSAGE("e5: Throwing function bad.", e5.throwing_fn().empty());
+
+ jexception e6(6, "class6", "fn6");
+ CPPUNIT_ASSERT_MESSAGE("e6: Error code bad.", e6.err_code() == 6);
+ CPPUNIT_ASSERT_MESSAGE("e6: Additional information bad.", e6.additional_info().empty());
+ CPPUNIT_ASSERT_MESSAGE("e6: Throwing class bad.",
+ e6.throwing_class().compare("class6") == 0);
+ CPPUNIT_ASSERT_MESSAGE("e6: Throwing function bad.", e6.throwing_fn().compare("fn6") == 0);
+
+ string cls7("class7");
+ string fn7("fn7");
+ jexception e7(7, cls7, fn7);
+ CPPUNIT_ASSERT_MESSAGE("e7: Error code bad.", e7.err_code() == 7);
+ CPPUNIT_ASSERT_MESSAGE("e7: Additional information bad.", e7.additional_info().empty());
+ CPPUNIT_ASSERT_MESSAGE("e7: Throwing class bad.", e7.throwing_class().compare(cls7) == 0);
+ CPPUNIT_ASSERT_MESSAGE("e7: Throwing function bad.", e7.throwing_fn().compare(fn7) == 0);
+
+ jexception e8(8, "exception8", "class8", "fn8");
+ CPPUNIT_ASSERT_MESSAGE("e8: Error code bad.", e8.err_code() == 8);
+ CPPUNIT_ASSERT_MESSAGE("e8: Additional information bad.",
+ e8.additional_info().compare("exception8") == 0);
+ CPPUNIT_ASSERT_MESSAGE("e8: Throwing class bad.",
+ e8.throwing_class().compare("class8") == 0);
+ CPPUNIT_ASSERT_MESSAGE("e8: Throwing function bad.", e8.throwing_fn().compare("fn8") == 0);
+
+ string msg9("exception9");
+ string cls9("class9");
+ string fn9("fn9");
+ jexception e9(9, msg9, cls9, fn9);
+ CPPUNIT_ASSERT_MESSAGE("e9: Error code bad.", e9.err_code() == 9);
+ CPPUNIT_ASSERT_MESSAGE("e9: Additional information bad.",
+ e9.additional_info().compare(msg9) == 0);
+ CPPUNIT_ASSERT_MESSAGE("e9: Throwing class bad.", e9.throwing_class().compare(cls9) == 0);
+ CPPUNIT_ASSERT_MESSAGE("e9: Throwing function bad.", e9.throwing_fn().compare(fn9) == 0);
+ }
+
+private:
+
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(JournalUnitTests);
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-23 17:37:01 UTC (rev 941)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-24 18:04:55 UTC (rev 942)
@@ -22,26 +22,30 @@
abs_builddir=@abs_builddir@
AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread
-#AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread -DRHM_CLEAN -DRHM_WRONLY -DRHM_TESTVALS
INCLUDES=-I../../lib
TESTS = run-journal-tests
check_LTLIBRARIES = \
- JournalTest.la \
+ JournalUnitTests.la \
+ JournalSystemTests.la \
libdlclose_noop.la
-JournalTest_la_SOURCES = \
- JournalTest.cpp \
+JournalUnitTests_la_SOURCES = \
+ JournalUnitTests.cpp
+JournalUnitTests_la_LDFLAGS = -module -rpath $(abs_builddir)
+
+JournalSystemTests_la_SOURCES = \
+ JournalSystemTests.cpp \
jtest.cpp \
msg_producer.cpp \
msg_consumer.cpp \
jtest.hpp \
msg_producer.hpp \
msg_consumer.hpp
-JournalTest_la_LDFLAGS = -lpthread -module -rpath $(abs_builddir)
-JournalTest_la_LIBADD = ../../lib/libbdbstore.la $(CPPUNIT_LIBS)
+JournalSystemTests_la_LDFLAGS = -lpthread -module -rpath $(abs_builddir)
+JournalSystemTests_la_LIBADD = ../../lib/libbdbstore.la $(CPPUNIT_LIBS)
libdlclose_noop_la_SOURCES = ../dlclose_noop.c
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-23 17:37:01 UTC (rev 941)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-24 18:04:55 UTC (rev 942)
@@ -122,5 +122,6 @@
clean-data:
@rm -rf jdata
+ @rm -rf rd_test_jrnls
clean-all: clean clean-data
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2007-09-23 17:37:01 UTC (rev 941)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2007-09-24 18:04:55 UTC (rev 942)
@@ -4,6 +4,6 @@
fail=0
-LD_PRELOAD=$pwd/.libs/libdlclose_noop.so DllPlugInTester -c -b $pwd/.libs/*Test.so || fail=1
+LD_PRELOAD=$pwd/.libs/libdlclose_noop.so DllPlugInTester -c -b $pwd/.libs/Journal*Tests.so || fail=1
exit $fail
16 years, 9 months
rhmessaging commits: r941 - store/trunk/cpp/tests/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-23 13:37:01 -0400 (Sun, 23 Sep 2007)
New Revision: 941
Removed:
store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
Modified:
store/trunk/cpp/tests/jrnl/JournalTest.cpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/rtest
Log:
Changed tests in jrnl_scope_test.cpp into CppUnit format and added them to JournalTest.cpp.
Modified: store/trunk/cpp/tests/jrnl/JournalTest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-22 15:09:17 UTC (rev 940)
+++ store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-23 17:37:01 UTC (rev 941)
@@ -34,216 +34,816 @@
#include "msg_consumer.hpp"
#include "jtest.hpp"
+#define MAX_MSG_SIZE 127
+#define NUM_MSGS 5
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000
+
class JournalTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(JournalTest);
- CPPUNIT_TEST(Test_000);
- CPPUNIT_TEST(Test_001);
- CPPUNIT_TEST(Test_002);
- CPPUNIT_TEST(Test_003);
- CPPUNIT_TEST(Test_004);
- CPPUNIT_TEST(Test_005);
- CPPUNIT_TEST(Test_006);
- CPPUNIT_TEST(Test_007);
- CPPUNIT_TEST(Test_008);
- CPPUNIT_TEST(Test_009);
- CPPUNIT_TEST(Test_010);
- CPPUNIT_TEST(Test_011);
- CPPUNIT_TEST(Test_012);
- CPPUNIT_TEST(Test_013);
- CPPUNIT_TEST(Test_014);
- CPPUNIT_TEST(Test_015);
- CPPUNIT_TEST(Test_016);
- CPPUNIT_TEST(Test_017);
- CPPUNIT_TEST(Test_018);
- CPPUNIT_TEST(Test_019);
- CPPUNIT_TEST(Test_020);
- CPPUNIT_TEST(Test_021);
- CPPUNIT_TEST(Test_022);
- CPPUNIT_TEST(Test_023);
- CPPUNIT_TEST(Test_024);
- CPPUNIT_TEST(Test_025);
- CPPUNIT_TEST(Test_026);
- CPPUNIT_TEST(Test_027);
-// CPPUNIT_TEST(Test_028); // Until race condition fixed
+ CPPUNIT_TEST(InstantiationTest_Stack);
+ CPPUNIT_TEST(InstantiationTest_Heap);
+ CPPUNIT_TEST(InitializationTest_Stack);
+ CPPUNIT_TEST(InitializationTest_Heap);
+ CPPUNIT_TEST(EmptyRecoverTest_Stack);
+ CPPUNIT_TEST(EmptyRecoverTest_Heap);
+ CPPUNIT_TEST(EnqueueTest_Stack);
+ CPPUNIT_TEST(EnqueueTest_Heap);
+ CPPUNIT_TEST(RecoverReadTest_Stack);
+ CPPUNIT_TEST(RecoverReadTest_Heap);
+ CPPUNIT_TEST(RecoveredReadTest_Stack);
+ CPPUNIT_TEST(RecoveredReadTest_Heap);
+ CPPUNIT_TEST(RecoveredDequeueTest_Stack);
+ CPPUNIT_TEST(RecoveredDequeueTest_Heap);
+ CPPUNIT_TEST(ComplexRecoveryTest1_Stack);
+ CPPUNIT_TEST(ComplexRecoveryTest1_Heap);
+ CPPUNIT_TEST(EncodeTest_000);
+ CPPUNIT_TEST(EncodeTest_001);
+ CPPUNIT_TEST(EncodeTest_002);
+ CPPUNIT_TEST(EncodeTest_003);
+ CPPUNIT_TEST(EncodeTest_004);
+ CPPUNIT_TEST(EncodeTest_005);
+ CPPUNIT_TEST(EncodeTest_006);
+ CPPUNIT_TEST(EncodeTest_007);
+ CPPUNIT_TEST(EncodeTest_008);
+ CPPUNIT_TEST(EncodeTest_009);
+ CPPUNIT_TEST(EncodeTest_010);
+ CPPUNIT_TEST(EncodeTest_011);
+ CPPUNIT_TEST(EncodeTest_012);
+ CPPUNIT_TEST(EncodeTest_013);
+ CPPUNIT_TEST(EncodeTest_014);
+ CPPUNIT_TEST(EncodeTest_015);
+ CPPUNIT_TEST(EncodeTest_016);
+ CPPUNIT_TEST(EncodeTest_017);
+ CPPUNIT_TEST(EncodeTest_018);
+ CPPUNIT_TEST(EncodeTest_019);
+ CPPUNIT_TEST(EncodeTest_020);
+ CPPUNIT_TEST(EncodeTest_021);
+ CPPUNIT_TEST(EncodeTest_022);
+ CPPUNIT_TEST(EncodeTest_023);
+ CPPUNIT_TEST(EncodeTest_024);
+ CPPUNIT_TEST(EncodeTest_025);
+ CPPUNIT_TEST(EncodeTest_026);
+ CPPUNIT_TEST(EncodeTest_027);
+// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
CPPUNIT_TEST_SUITE_END();
jtest t;
+ std::string msg;
+ char buff[MAX_MSG_SIZE + 1];
- void runJournalTest(const unsigned num_msgs, const unsigned min_msg_size,
- const unsigned max_msg_szie, const bool auto_deq, const unsigned iterations,
- const char* test_descr)
+public:
+
+ void InstantiationTest_Stack()
{
- std::cout << " [" << test_descr << "] " << std::flush;
- jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, test_descr);
- for (unsigned i=0; i<iterations; i++)
+ const char* test_name = "InstantiationTest_Stack";
+ try
{
- std::cout << "." << std::flush;
- try
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InstantiationTest_Heap()
+ {
+ const char* test_name = "InstantiationTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InitializationTest_Stack()
+ {
+ const char* test_name = "InitializationTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void InitializationTest_Heap()
+ {
+ const char* test_name = "InitializationTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
+ void EmptyRecoverTest_Stack()
+ {
+ const char* test_name = "EmptyRecoverTest_Stack";
+ try
+ {
{
- t.initialize(ta);
- t.run();
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
}
- catch (rhm::journal::jexception e)
{
- t.finalize();
- std::string s;
- CPPUNIT_FAIL(e.to_string(s));
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
}
- t.finalize();
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ jc.recovered();
+ }
}
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
-public:
- void Test_000()
+ void EmptyRecoverTest_Heap()
{
- runJournalTest(0, 0, 0, false, 2, "Empty journal");
+ const char* test_name = "EmptyRecoverTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ jcp->recovered();
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_001()
+ void EnqueueTest_Stack()
{
- runJournalTest(1, 10, 10, false, 2, "1*(10 bytes)");
+ const char* test_name = "EnqueueTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_002()
+ void EnqueueTest_Heap()
{
- runJournalTest(1, 10, 10, true, 2, "1*(10 bytes), auto-deq");
+ const char* test_name = "EnqueueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_003()
+ void RecoverReadTest_Stack()
{
- runJournalTest(10, 10, 10, false, 2, "10*(10 bytes)");
+ const char* test_name = "RecoverReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_004()
+ void RecoverReadTest_Heap()
{
- runJournalTest(10, 10, 10, true, 2, "10*(10 bytes), auto-deq");
+ const char* test_name = "RecoverReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_005()
+ void RecoveredReadTest_Stack()
{
- runJournalTest(10, 92, 92, false, 2, "10*(1 dblk exact fit)");
+ const char* test_name = "RecoveredReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_006()
+ void RecoveredReadTest_Heap()
{
- runJournalTest(10, 92, 92, true, 2, "10*(1 dblk exact fit), auto-deq");
+ const char* test_name = "RecoveredReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_007()
+ void RecoveredDequeueTest_Stack()
{
- runJournalTest(10, 93, 93, false, 2, "10*(1 dblk + 1 byte)");
+ const char* test_name = "RecoveredDequeueTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_008()
+ void RecoveredDequeueTest_Heap()
{
- runJournalTest(10, 93, 93, true, 2, "10*(1 dblk + 1 byte), auto-deq");
+ const char* test_name = "RecoveredDequeueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_009()
+ void ComplexRecoveryTest1_Stack()
{
- runJournalTest(10, 476, 476, false, 2, "10*(1 sblk exact fit)");
+ const char* test_name = "ComplexRecoveryTest1_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ jc.recovered();
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_msg(&jc, create_msg(msg, m));
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(&jc)) == 0);
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_010()
+ void ComplexRecoveryTest1_Heap()
{
- runJournalTest(10, 476, 476, true, 2, "10*(1 sblk exact fit), auto-deq");
+ const char* test_name = "RecoveredDequeueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ jcp->recovered();
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_msg(jcp, create_msg(msg, m));
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m).compare(read_msg(jcp)) == 0);
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(jcp, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
}
- void Test_011()
+ void EncodeTest_000()
{
- runJournalTest(10, 477, 477, false, 2, "10*(1 sblk + 1 byte)");
+ runEncodeTest(0, 0, 0, false, 2, "Empty journal");
}
- void Test_012()
+ void EncodeTest_001()
{
- runJournalTest(10, 477, 477, true, 2, "10*(1 sblk + 1 byte), auto-deq");
+ runEncodeTest(1, 10, 10, false, 2, "1*(10 bytes)");
}
- void Test_013()
+ void EncodeTest_002()
{
- runJournalTest(8, 4060, 4060, false, 2, "8*(1/8 page)");
+ runEncodeTest(1, 10, 10, true, 2, "1*(10 bytes), auto-deq");
}
- void Test_014()
+ void EncodeTest_003()
{
- runJournalTest(9, 4060, 4060, false, 2, "9*(1/8 page)");
+ runEncodeTest(10, 10, 10, false, 2, "10*(10 bytes)");
}
- void Test_015()
+ void EncodeTest_004()
{
- runJournalTest(8, 4061, 4061, false, 2, "8*(1/8 page + 1 byte)");
+ runEncodeTest(10, 10, 10, true, 2, "10*(10 bytes), auto-deq");
}
- void Test_016()
+ void EncodeTest_005()
{
- runJournalTest(8, 3932, 3932, true, 2, "8*(1/8 page - 1 dblk for deq record), auto-deq");
+ runEncodeTest(10, 92, 92, false, 2, "10*(1 dblk exact fit)");
}
- void Test_017()
+ void EncodeTest_006()
{
- runJournalTest(9, 3932, 3932, true, 2, "9*(1/8 page - 1 dblk for deq record), auto-deq");
+ runEncodeTest(10, 92, 92, true, 2, "10*(1 dblk exact fit), auto-deq");
}
- void Test_018()
+ void EncodeTest_007()
{
- runJournalTest(8, 3933, 3933, true, 2,
+ runEncodeTest(10, 93, 93, false, 2, "10*(1 dblk + 1 byte)");
+ }
+
+ void EncodeTest_008()
+ {
+ runEncodeTest(10, 93, 93, true, 2, "10*(1 dblk + 1 byte), auto-deq");
+ }
+
+ void EncodeTest_009()
+ {
+ runEncodeTest(10, 476, 476, false, 2, "10*(1 sblk exact fit)");
+ }
+
+ void EncodeTest_010()
+ {
+ runEncodeTest(10, 476, 476, true, 2, "10*(1 sblk exact fit), auto-deq");
+ }
+
+ void EncodeTest_011()
+ {
+ runEncodeTest(10, 477, 477, false, 2, "10*(1 sblk + 1 byte)");
+ }
+
+ void EncodeTest_012()
+ {
+ runEncodeTest(10, 477, 477, true, 2, "10*(1 sblk + 1 byte), auto-deq");
+ }
+
+ void EncodeTest_013()
+ {
+ runEncodeTest(8, 4060, 4060, false, 2, "8*(1/8 page)");
+ }
+
+ void EncodeTest_014()
+ {
+ runEncodeTest(9, 4060, 4060, false, 2, "9*(1/8 page)");
+ }
+
+ void EncodeTest_015()
+ {
+ runEncodeTest(8, 4061, 4061, false, 2, "8*(1/8 page + 1 byte)");
+ }
+
+ void EncodeTest_016()
+ {
+ runEncodeTest(8, 3932, 3932, true, 2, "8*(1/8 page - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_017()
+ {
+ runEncodeTest(9, 3932, 3932, true, 2, "9*(1/8 page - 1 dblk for deq record), auto-deq");
+ }
+
+ void EncodeTest_018()
+ {
+ runEncodeTest(8, 3933, 3933, true, 2,
"8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
}
- void Test_019()
+ void EncodeTest_019()
{
- runJournalTest(32, 32732, 32732, false, 2, "32*(1 page exact fit)");
+ runEncodeTest(32, 32732, 32732, false, 2, "32*(1 page exact fit)");
}
- void Test_020()
+ void EncodeTest_020()
{
- runJournalTest(33, 32732, 32732, false, 2, "33*(1 page exact fit)");
+ runEncodeTest(33, 32732, 32732, false, 2, "33*(1 page exact fit)");
}
- void Test_021()
+ void EncodeTest_021()
{
- runJournalTest(22, 49116, 49116, false, 2, "22*(1.5 pages)");
+ runEncodeTest(22, 49116, 49116, false, 2, "22*(1.5 pages)");
}
- void Test_022()
+ void EncodeTest_022()
{
- runJournalTest(22, 48988, 48988, true, 2,
+ runEncodeTest(22, 48988, 48988, true, 2,
"22*(1.5 pages - 1 dblk for deq record), auto-deq");
}
- void Test_023()
+ void EncodeTest_023()
{
- runJournalTest(48, 32732, 32732, false, 2, "48*(1 page exact fit)");
+ runEncodeTest(48, 32732, 32732, false, 2, "48*(1 page exact fit)");
}
- void Test_024()
+ void EncodeTest_024()
{
- runJournalTest(49, 32732, 32732, false, 2, "49*(1 page exact fit)");
+ runEncodeTest(49, 32732, 32732, false, 2, "49*(1 page exact fit)");
}
- void Test_025()
+ void EncodeTest_025()
{
- runJournalTest(20, 81884, 81884, false, 2, "20*(2.5 pages)");
+ runEncodeTest(20, 81884, 81884, false, 2, "20*(2.5 pages)");
}
- void Test_026()
+ void EncodeTest_026()
{
- runJournalTest(20, 81756, 81756, true, 2,
+ runEncodeTest(20, 81756, 81756, true, 2,
"20*(2.5 pages - 1 dblk for deq record), auto-deq");
}
- void Test_027()
+ void EncodeTest_027()
{
- runJournalTest(16, 786268, 786268, true, 2,
+ runEncodeTest(16, 786268, 786268, true, 2,
"16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
}
- void Test_028()
+ void EncodeTest_028()
{
- runJournalTest(17, 786268, 786268, true, 2,
+ runEncodeTest(17, 786268, 786268, true, 2,
"17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
}
+
+private:
+
+ void enq_msg(rhm::journal::jcntl* jc, const std::string msg)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_data(msg.c_str(), msg.size(), dtp), jc,
+ aio_sleep_cnt, dtp));
+ }
+
+ void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_data(dtp), jc, aio_sleep_cnt, dtp));
+ }
+
+ char* read_msg(rhm::journal::jcntl* jc)
+ {
+ memset(buff, '*', MAX_MSG_SIZE);
+ buff[MAX_MSG_SIZE] = '\0';
+
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->read_data(buff, MAX_MSG_SIZE, dtp), jc, aio_sleep_cnt, dtp));
+ buff[dtp->dsize()] = '\0';
+ return buff;
+ }
+
+ bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
+ unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
+ {
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ buff[dtp->dsize()] = '\0';
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ {
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): timeout on RHM_IORES_AIO_WAIT.");
+ }
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_EMPTY.");
+ case rhm::journal::RHM_IORES_FULL:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_FULL.");
+ case rhm::journal::RHM_IORES_BUSY:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): RHM_IORES_BUSY.");
+ default:
+ delete dtp;
+ CPPUNIT_FAIL("dequeue_data(): unknown return value.");
+ }
+ return true;
+ }
+
+ static std::string& create_msg(std::string& s, int msg_num)
+ {
+ std::stringstream ss;
+ ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
+ ss << "_4567890123456789012345678901234567890";
+ ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
+ s.assign(ss.str());
+ return s;
+ }
+
+ void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
+ const unsigned max_msg_szie, const bool auto_deq, const unsigned iterations,
+ const char* test_descr)
+ {
+ std::cout << " [" << test_descr << "] " << std::flush;
+ jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, test_descr);
+ for (unsigned i=0; i<iterations; i++)
+ {
+ std::cout << "." << std::flush;
+ try
+ {
+ t.initialize(ta);
+ t.run();
+ }
+ catch (rhm::journal::jexception e)
+ {
+ t.finalize();
+ std::string s;
+ CPPUNIT_FAIL(e.to_string(s));
+ }
+ t.finalize();
+ }
+ }
};
// Make this test suite a plugin.
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-22 15:09:17 UTC (rev 940)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-23 17:37:01 UTC (rev 941)
@@ -68,7 +68,7 @@
msg_producer.o \
msg_consumer.o \
-JTEST_FILES = jtest jrnl_scope_test
+JTEST_FILES = jtest
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
@@ -104,9 +104,6 @@
jtest: $(JRNL_OBJ_FILES) $(JTEST_OBJ_FILES)
-jrnl_scope_test: RHM_DEFINES = -DRHM_CLEAN
-jrnl_scope_test: $(JRNL_OBJ_FILES) jrnl_scope_test.cpp
-
jrtest: RHM_DEFINES = -DRHM_JOWRITE -DRHM_RDONLY -DRHM_TESTVALS
jrtest: $(JTEST_FILES)
@@ -122,7 +119,6 @@
clean:
@rm -f *.o
@rm -f jtest
- @rm -f jrnl_scope_test
clean-data:
@rm -rf jdata
Deleted: store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-22 15:09:17 UTC (rev 940)
+++ store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-23 17:37:01 UTC (rev 941)
@@ -1,751 +0,0 @@
-#include <iostream>
-#include <jrnl/data_tok.hpp>
-#include <jrnl/jcntl.hpp>
-#include <sstream>
-
-using namespace std;
-
-#define TEST_ITERATIONS 5
-#define NUM_TESTS 16
-#define NUM_MSGS 5
-#define MAX_MSG_SIZE 127
-#define MAX_AIO_SLEEPS 500
-#define AIO_SLEEP_TIME 1000
-
-class jrnl_scope_test
-{
- static const char* iores_str[];
- std::string _msg;
-public:
- jrnl_scope_test() {_msg.reserve(MAX_MSG_SIZE+1);}
- ~jrnl_scope_test() {}
-
- int run()
- {
- int res;
- for (int t=1; t<=NUM_TESTS; t++)
- if ((res = run(t, TEST_ITERATIONS)))
- break;
- return res;
- }
-
- int run(int test_num, int num_iter)
- {
- try
- {
- switch(test_num)
- {
- case 1:
- return test_1(num_iter);
- case 2:
- return test_2(num_iter);
- case 3:
- return test_3(num_iter);
- case 4:
- return test_4(num_iter);
- case 5:
- return test_5(num_iter);
- case 6:
- return test_6(num_iter);
- case 7:
- return test_7(num_iter);
- case 8:
- return test_8(num_iter);
- case 9:
- return test_9(num_iter);
- case 10:
- return test_10(num_iter);
- case 11:
- return test_11(num_iter);
- case 12:
- return test_12(num_iter);
- case 13:
- return test_13(num_iter);
- case 14:
- return test_14(num_iter);
- case 15:
- return test_15(num_iter);
- case 16:
- return test_16(num_iter);
- default:
- cout << " unknown test: " << test_num << endl;
- return 1;
- }
- }
- catch (rhm::journal::jexception& e)
- {
- cout << e << endl;
- return 2;
- }
- }
-
-protected:
- int test_1(int num_iter)
- {
- cout << " 1. instance: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- rhm::journal::jcntl jc("scope_test_01", "jdata", "t01");
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_2(int num_iter)
- {
- cout << " 2. instance_ptr: ";
- {
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_02", "jdata", "t02");
- if (!jc)
- {
- cout << " failed, jc==NULL" << endl;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_3(int num_iter)
- {
- cout << " 3. init: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- rhm::journal::jcntl jc("scope_test_03", "jdata", "t03");
- jc.initialize();
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_4(int num_iter)
- {
- cout << " 4. init_ptr: ";
- {
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_04", "jdata", "t04");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_5(int num_iter)
- {
- cout << " 5. init_recover: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_05", "jdata", "t05");
- jc.initialize();
- }
- {
- rhm::journal::jcntl jc("scope_test_05", "jdata", "t05");
- jc.recover();
- }
- {
- rhm::journal::jcntl jc("scope_test_05", "jdata", "t05");
- jc.recover();
- jc.recovered();
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_6(int num_iter)
- {
- cout << " 6. init_recover_ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_06", "jdata", "t06");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_06", "jdata", "t06");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_06", "jdata", "t06");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- jc->recovered();
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_7(int num_iter)
- {
- cout << " 7. enq: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_07", "jdata", "t07");
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_8(int num_iter)
- {
- cout << " 8. enq_ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_08", "jdata", "t08");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_9(int num_iter)
- {
- cout << " 9. recover_read: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_09", "jdata", "t09");
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++) // 12288 - fills 1 file
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- }
- {
- rhm::journal::jcntl jc("scope_test_09", "jdata", "t09");
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, &jc))
- return 1;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_10(int num_iter)
- {
- cout << " 10. recover_read_ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_10", "jdata", "t10");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_10", "jdata", "t10");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_11(int num_iter)
- {
- cout << " 11. recover_read_read: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_11", "jdata", "t11");
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- }
- {
- rhm::journal::jcntl jc("scope_test_11", "jdata", "t11");
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, &jc))
- return 1;
- jc.recovered();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, &jc))
- return 1;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_12(int num_iter)
- {
- cout << " 12. recover_read_read_ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_12", "jdata", "t12");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_12", "jdata", "t12");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- jc->recovered();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_13(int num_iter)
- {
- cout << " 13. recover_read_read_deq: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_13", "jdata", "t13");
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- }
- {
- rhm::journal::jcntl jc("scope_test_13", "jdata", "t13");
- jc.recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, &jc))
- return 1;
- jc.recovered();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, &jc))
- return 1;
- for (int m=0; m<NUM_MSGS; m++)
- if (deq_msg(&jc, m))
- return 1;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_14(int num_iter)
- {
- cout << " 14. recover_read_read_deq_ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_14", "jdata", "t14");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_14", "jdata", "t14");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- jc->recovered();
- for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- for (int m=0; m<NUM_MSGS; m++)
- if (deq_msg(jc, m))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_15(int num_iter)
- {
- cout << " 15. complex recover: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl jc("scope_test_15", "jdata", "t15");
- jc.initialize();
- // enqueue msgs, read then dequeue half
- // rids 0-9
- for (int m=0; m<NUM_MSGS*2; m++)
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- // rids 10-14 dequeueing rids 0-4
- for (int m=0; m<NUM_MSGS; m++)
- if (deq_msg(&jc, m))
- return 1;
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (read_msg(m, &jc))
- return 1;
- }
- {
- rhm::journal::jcntl jc("scope_test_15", "jdata", "t15");
- jc.recover();
- // recover other half of messages
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (read_msg(m, &jc))
- return 1;
- jc.recovered();
- // write more messages, read and dequeue all
- // rids 15-19
- for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
- if (enq_msg(&jc, create_msg(_msg, m)))
- return 1;
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
- if (read_msg(m, &jc))
- return 1;
- // rids 20-29 dequeueing rids 5-9, 15-19
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (deq_msg(&jc, m))
- return 1;
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- if (deq_msg(&jc, m))
- return 1;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int test_16(int num_iter)
- {
- cout << " 16. complex recover ptr: ";
- for (int i=0; i<num_iter; i++)
- {
- cout << "." << flush;
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_16", "jdata", "t16");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->initialize();
- // enqueue msgs, read then dequeue half
- for (int m=0; m<NUM_MSGS*2; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- for (int m=0; m<NUM_MSGS; m++)
- if (deq_msg(jc, m))
- {
- delete jc;
- return 1;
- }
- jc->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- {
- rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_16", "jdata", "t16");
- if (!jc)
- {
- cout << "failed" << endl;
- return 1;
- }
- jc->recover();
- // recover other half of messages
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- jc->recovered();
- // write more messages, read and dequeue all
- for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
- if (enq_msg(jc, create_msg(_msg, m)))
- {
- delete jc;
- return 1;
- }
- jc->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
- if (read_msg(m, jc))
- {
- delete jc;
- return 1;
- }
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- if (deq_msg(jc, m))
- {
- delete jc;
- return 1;
- }
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- if (deq_msg(jc, m))
- {
- delete jc;
- return 1;
- }
- delete jc;
- }
- }
- cout << " ok" << endl;
- return 0;
- }
-
- int enq_msg(rhm::journal::jcntl* jc, const std::string msg)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- rhm::journal::iores res = jc->enqueue_data(msg.c_str(), msg.size(), dtp);
- if (res != rhm::journal::RHM_IORES_SUCCESS)
- {
- cout << "jcntl::enqueue_data() returned " << iores_str[res] << ". failed" << endl;
- delete dtp;
- return 1;
- }
- return 0;
- }
-
- int deq_msg(rhm::journal::jcntl* jc, u_int64_t drid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(drid);
- rhm::journal::iores res = jc->dequeue_data(dtp);
- if (res != rhm::journal::RHM_IORES_SUCCESS)
- {
- cout << "jcntl::dequeue_data() returned " << iores_str[res] << ". failed" << endl;
- delete dtp;
- return 1;
- }
- return 0;
- }
-
- int read_msg(int msg_num, rhm::journal::jcntl* jc)
- {
- char buff[MAX_MSG_SIZE + 1];
- memset(buff, '?', MAX_MSG_SIZE);
- buff[MAX_MSG_SIZE] = '\0';
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- unsigned aio_sleep_cnt = 0;
- bool read = false;
- while (!read)
- {
- rhm::journal::iores res = jc->read_data(buff, 127, dtp);
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- buff[dtp->dsize()] = '\0';
- read = true;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- {
- cout << "Exceeded max count for RHM_IORES_AIO_WAIT. failed" << endl;
- delete dtp;
- return 1;
- }
- jc->get_wr_events();
- ::usleep(AIO_SLEEP_TIME);
- break;
- default:
- cout << "jcntl::read_data() returned " << iores_str[res] << ". failed" << endl;
- delete dtp;
- return 1;
- }
- }
- std::string s;
- create_msg(s, msg_num);
- if (s.compare(buff) != 0)
- {
- cout << "Message comparison failure: read \"" << buff << "\", expected \"" << s <<
- "\"." << endl;
- return 1;
- }
- return 0;
- }
-
- static std::string& create_msg(std::string& s, int msg_num)
- {
- std::stringstream ss;
- ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
- ss << "_4567890123456789012345678901234567890";
- ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
- s.assign(ss.str());
- return s;
- }
-};
-
-int main(int argc, char** argv)
-{
- int test_num = 0;
- int num_iter = TEST_ITERATIONS;
- if (argc > 1)
- test_num = atoi(argv[1]);
- if (argc > 2)
- num_iter = atoi(argv[2]);
- if (num_iter < 1)
- {
- cout << "num_iter=" << num_iter << endl;
- return 1;
- }
- cout << "jrnl_scope_test";
- if (test_num)
- cout << " test=" << test_num;
- cout << ":" << endl;
- jrnl_scope_test jst;
- int res = test_num ? jst.run(test_num, num_iter) : jst.run();
- if (res)
- {
- cout << "failed res=" << res << endl;
- return res;
- }
- cout << "ok" << endl;
- return 0;
-}
-
-const char* jrnl_scope_test::iores_str[] = {
- "RHM_IORES_SUCCESS",
- "RHM_IORES_AIO_WAIT",
- "RHM_IORES_EMPTY",
- "RHM_IORES_FULL",
- "RHM_IORES_BUSY"
- };
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-09-22 15:09:17 UTC (rev 940)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-09-23 17:37:01 UTC (rev 941)
@@ -56,7 +56,8 @@
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./ftest.py"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"
16 years, 9 months
rhmessaging commits: r940 - in store/trunk/cpp: lib/jrnl and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-22 11:09:17 -0400 (Sat, 22 Sep 2007)
New Revision: 940
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/Makefile.rtest
Log:
Bugfix: replaced rrfc.is_wr_compl() with rrfc.is_wr_aio_outstanding() to fix logic error causing premeture RHM_IORES_EMPTY exit on some reads
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -409,71 +409,70 @@
dtokp.set_wstate(rhm::journal::data_tok::ENQ);
// read the message from the Journal.
- while (read) {
+ try {
+ while (read) {
std:: cout << "loop -- uses fixed size -> FIX <-" << std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
- rhm::journal::iores res;
- try {
- res = jc->read_data(&buff, buffSize, &dtokp);
- } catch (rhm::journal::jexception& e) {
- std::cout << "recover read" << e << std::endl;
- std::string str;
- THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
- }
- readSize = dtokp.dsize();
- assert(readSize < buffSize); /// fail safe for hack...
+ rhm::journal::iores res = jc->read_data(&buff, buffSize, &dtokp);
+ readSize = dtokp.dsize();
+ assert(readSize < buffSize); /// fail safe for hack...
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- char* data = buff;
- unsigned headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ char* data = buff;
+ unsigned headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
- msg->setPersistenceId(dtokp.rid());
+ RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ msg->setPersistenceId(dtokp.rid());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize)) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid())) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
+ if (msg->loadContent(contentSize)) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid())) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
- if (dtokp.rid() > maxMessageId) {
- maxMessageId = dtokp.rid();
- }
+ if (dtokp.rid() > maxMessageId) {
+ maxMessageId = dtokp.rid();
+ }
- dtokp.reset();
- dtokp.set_wstate(rhm::journal::data_tok::ENQ);
- break;
- }
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- {
- THROW_STORE_EXCEPTION("Store error, disk time out on recover for:" + queue->getName());
- }
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
- assert (jc->get_enq_cnt() == msg_count);
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
- default:
- assert( "Store Error: Unexpected msg state");
- }
- }
+ dtokp.reset();
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ {
+ THROW_STORE_EXCEPTION("Store error, disk time out on recover for:" + queue->getName());
+ }
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+ assert (jc->get_enq_cnt() == msg_count);
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
+ } // switch
+ } // while
+ } catch (rhm::journal::jexception& e) {
+ std::stringstream ss;
+ ss << e;
+ THROW_STORE_EXCEPTION("Error dequeuing message: " + ss.str());
+ }
messageIdSequence.reset(maxMessageId + 1);
}
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -371,7 +371,7 @@
nlfh::status_str(std::string& s) const
{
std::stringstream ss;
- ss << "nlfh[" << _fid << "]: ws=" << _wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
+ ss << "fid=" << _fid << " ws=" << _wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
ss << " rs=" << _rd_subm_cnt_dblks << " rc=" << _rd_cmpl_cnt_dblks;
s.assign(ss.str());
return s;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -71,12 +71,11 @@
//std::cout << " rmgr::read()" << std::flush;
if (_aio_evt_rem)
get_events();
-//std::cout << " [a pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " wc=" << (_rrfc.is_wr_compl()?"T":"F") << "]" << std::flush;
-// if(dblks_rem() == 0 && _rrfc.is_full())
-// if(dblks_rem() == 0 && _rrfc.is_compl())
- if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
+//std::string s;
+//std::cout << " [a pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " wo=" << (_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
-// aio_cycle(); // check if any AIOs have returned
+ aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
}
//std::cout << " b" << std::flush;
@@ -110,10 +109,8 @@
// Read header, determine next record type
while (true)
{
-//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << "]" << std::flush;
-// if(dblks_rem() == 0 && _rrfc.is_full())
-// if(dblks_rem() == 0 && _rrfc.is_compl())
- if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
+//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -124,7 +124,8 @@
inline const u_int32_t aio_outstanding_dblks()
{ return _curr_fh->rd_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
- inline const bool is_wr_compl() const { return _curr_fh->is_wr_compl(); }
+ inline const bool is_wr_aio_outstanding() const
+ { return _curr_fh->wr_aio_outstanding_dblks() > 0; }
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/tests/Makefile.am 2007-09-22 15:09:17 UTC (rev 940)
@@ -17,8 +17,8 @@
TransactionalTest
TESTS = \
- system_test.sh \
- run-unit-tests
+ run-unit-tests \
+ system_test.sh
TESTS_ENVIRONMENT = \
QPID_DIR=$(QPID_DIR) \
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-22 15:09:17 UTC (rev 940)
@@ -73,7 +73,7 @@
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers -pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
-LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/qpid/cpp/src/.libs
+LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/redhat/qpid/cpp/src/.libs
.SUFFIXES:
.SUFFIXES: .cpp .o
16 years, 9 months
rhmessaging commits: r939 - in store/trunk/cpp: tests/jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-21 16:43:29 -0400 (Fri, 21 Sep 2007)
New Revision: 939
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
Log:
Solved rmgr::skip() bugs which caused JERR_RMGR_UNKNOWNMAGIC and other errors.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -45,8 +45,6 @@
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
- // FIXME: Make this thread safe!
- _icnt(_cnt++),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
@@ -54,10 +52,17 @@
_dblks_read(0),
_rid(0),
_sourceMsg(NULL)
-{}
+{
+ pthread_mutex_init(&_mutex, NULL);
+ pthread_mutex_lock(&_mutex);
+ _icnt = _cnt++;
+ pthread_mutex_unlock(&_mutex);
+}
data_tok::~data_tok()
-{}
+{
+ pthread_mutex_destroy(&_mutex);
+}
const char*
data_tok::wstate_str() const
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -49,6 +49,7 @@
}
}
+#include <pthread.h>
#include <sys/types.h>
#include <jrnl/jexception.hpp>
@@ -87,6 +88,7 @@
};
private:
+ pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -71,11 +71,12 @@
//std::cout << " rmgr::read()" << std::flush;
if (_aio_evt_rem)
get_events();
-//std::cout << " [a pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << "]" << std::flush;
+//std::cout << " [a pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " wc=" << (_rrfc.is_wr_compl()?"T":"F") << "]" << std::flush;
// if(dblks_rem() == 0 && _rrfc.is_full())
- if(dblks_rem() == 0 && _rrfc.is_compl())
+// if(dblks_rem() == 0 && _rrfc.is_compl())
+ if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
{
- aio_cycle(); // check if any AIOs have returned
+// aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
}
//std::cout << " b" << std::flush;
@@ -111,7 +112,8 @@
{
//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << "]" << std::flush;
// if(dblks_rem() == 0 && _rrfc.is_full())
- if(dblks_rem() == 0 && _rrfc.is_compl())
+// if(dblks_rem() == 0 && _rrfc.is_compl())
+ if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
@@ -190,6 +192,10 @@
{
//std::cout << " %" << std::flush;
// skip this record, it is already dequeued
+ u_int64_t* sizep = (u_int64_t*)((char*)rptr + sizeof(hdr));
+//u_int32_t dblks = jrec::size_dblks((size_t)*sizep + sizeof(enq_hdr) + sizeof(enq_tail));
+//std::cout << "{" << *sizep << "=" << dblks << "d}" << std::flush;
+ dtokp->set_dsize((size_t)*sizep);
skip(dtokp);
}
break;
@@ -287,7 +293,7 @@
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " V" << std::flush;
+//std::cout << " V" << dtokp->dblocks_read() << "," << dblks_rem() << std::flush;
// Read data from this page, first block will have header and data size.
u_int32_t dblks_rd = _data_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
@@ -346,18 +352,19 @@
const iores
rmgr::skip(data_tok* dtokp) throw (jexception)
{
+ u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize() + sizeof(enq_hdr) + sizeof(enq_tail));
u_int32_t tot_dblk_cnt = dtokp->rstate() == data_tok::SKIP_PART ? dtokp->dblocks_read() : 0;
//std::cout << " S" << tot_dblk_cnt << std::flush;
while (true)
{
u_int32_t this_dblk_cnt = 0;
- if (_data_rec.rec_size_dblks() - this_dblk_cnt > dblks_rem())
+ if (dsize_dblks - tot_dblk_cnt > dblks_rem())
//{std::cout << "-1" << std::flush;
this_dblk_cnt = dblks_rem();
//}
else
//{std::cout << "-2" << std::flush;
- this_dblk_cnt = _data_rec.rec_size_dblks() - this_dblk_cnt;
+ this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
//}
//std::cout << "->" << this_dblk_cnt << std::flush;
dtokp->incr_dblocks_read(this_dblk_cnt);
@@ -380,6 +387,8 @@
{
// Skip complete, put state back to unread
dtokp->set_rstate(data_tok::UNREAD);
+ dtokp->set_dsize(0);
+ dtokp->set_dblocks_read(0);
return RHM_IORES_SUCCESS;
}
}
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -124,6 +124,7 @@
inline const u_int32_t aio_outstanding_dblks()
{ return _curr_fh->rd_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
+ inline const bool is_wr_compl() const { return _curr_fh->is_wr_compl(); }
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 20:43:29 UTC (rev 939)
@@ -104,6 +104,7 @@
jtest: $(JRNL_OBJ_FILES) $(JTEST_OBJ_FILES)
+jrnl_scope_test: RHM_DEFINES = -DRHM_CLEAN
jrnl_scope_test: $(JRNL_OBJ_FILES) jrnl_scope_test.cpp
jrtest: RHM_DEFINES = -DRHM_JOWRITE -DRHM_RDONLY -DRHM_TESTVALS
Modified: store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -6,16 +6,18 @@
using namespace std;
#define TEST_ITERATIONS 5
-#define NUM_TESTS 14
+#define NUM_TESTS 16
#define NUM_MSGS 5
+#define MAX_MSG_SIZE 127
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000
class jrnl_scope_test
{
static const char* iores_str[];
+ std::string _msg;
public:
- jrnl_scope_test() {}
+ jrnl_scope_test() {_msg.reserve(MAX_MSG_SIZE+1);}
~jrnl_scope_test() {}
int run()
@@ -61,6 +63,10 @@
return test_13(num_iter);
case 14:
return test_14(num_iter);
+ case 15:
+ return test_15(num_iter);
+ case 16:
+ return test_16(num_iter);
default:
cout << " unknown test: " << test_num << endl;
return 1;
@@ -216,12 +222,8 @@
rhm::journal::jcntl jc("scope_test_07", "jdata", "t07");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
}
cout << " ok" << endl;
@@ -243,15 +245,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
}
@@ -268,19 +266,15 @@
{
rhm::journal::jcntl jc("scope_test_09", "jdata", "t09");
jc.initialize();
- for (int m=0; m<NUM_MSGS; m++) // 12288
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ for (int m=0; m<NUM_MSGS; m++) // 12288 - fills 1 file
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_09", "jdata", "t09");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
}
}
@@ -303,15 +297,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -323,7 +313,7 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -345,22 +335,18 @@
rhm::journal::jcntl jc("scope_test_11", "jdata", "t11");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_11", "jdata", "t11");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
jc.recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
}
}
@@ -383,15 +369,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -403,14 +385,14 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
}
jc->recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -432,22 +414,18 @@
rhm::journal::jcntl jc("scope_test_13", "jdata", "t13");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_13", "jdata", "t13");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
jc.recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
for (int m=0; m<NUM_MSGS; m++)
if (deq_msg(&jc, m))
@@ -473,15 +451,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -493,14 +467,14 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
}
jc->recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -518,10 +492,148 @@
return 0;
}
- int enq_msg(rhm::journal::jcntl* jc, const char* msg, const size_t msg_size)
+ int test_15(int num_iter)
{
+ cout << " 15. complex recover: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_15", "jdata", "t15");
+ jc.initialize();
+ // enqueue msgs, read then dequeue half
+ // rids 0-9
+ for (int m=0; m<NUM_MSGS*2; m++)
+ if (enq_msg(&jc, create_msg(_msg, m)))
+ return 1;
+ // rids 10-14 dequeueing rids 0-4
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_15", "jdata", "t15");
+ jc.recover();
+ // recover other half of messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ jc.recovered();
+ // write more messages, read and dequeue all
+ // rids 15-19
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ if (enq_msg(&jc, create_msg(_msg, m)))
+ return 1;
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ // rids 20-29 dequeueing rids 5-9, 15-19
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_16(int num_iter)
+ {
+ cout << " 16. complex recover ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_16", "jdata", "t16");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ // enqueue msgs, read then dequeue half
+ for (int m=0; m<NUM_MSGS*2; m++)
+ if (enq_msg(jc, create_msg(_msg, m)))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new rhm::journal::jcntl("scope_test_16", "jdata", "t16");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ // recover other half of messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->recovered();
+ // write more messages, read and dequeue all
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ if (enq_msg(jc, create_msg(_msg, m)))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int enq_msg(rhm::journal::jcntl* jc, const std::string msg)
+ {
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- rhm::journal::iores res = jc->enqueue_data(msg, msg_size, dtp);
+ rhm::journal::iores res = jc->enqueue_data(msg.c_str(), msg.size(), dtp);
if (res != rhm::journal::RHM_IORES_SUCCESS)
{
cout << "jcntl::enqueue_data() returned " << iores_str[res] << ". failed" << endl;
@@ -546,11 +658,11 @@
return 0;
}
- int read_msg(rhm::journal::jcntl* jc)
+ int read_msg(int msg_num, rhm::journal::jcntl* jc)
{
- char buff[128];
- memset(buff, '?', 127);
- buff[127] = '\0';
+ char buff[MAX_MSG_SIZE + 1];
+ memset(buff, '?', MAX_MSG_SIZE);
+ buff[MAX_MSG_SIZE] = '\0';
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
dtp->set_wstate(rhm::journal::data_tok::ENQ);
unsigned aio_sleep_cnt = 0;
@@ -561,6 +673,7 @@
switch (res)
{
case rhm::journal::RHM_IORES_SUCCESS:
+ buff[dtp->dsize()] = '\0';
read = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -570,6 +683,7 @@
delete dtp;
return 1;
}
+ jc->get_wr_events();
::usleep(AIO_SLEEP_TIME);
break;
default:
@@ -578,8 +692,26 @@
return 1;
}
}
+ std::string s;
+ create_msg(s, msg_num);
+ if (s.compare(buff) != 0)
+ {
+ cout << "Message comparison failure: read \"" << buff << "\", expected \"" << s <<
+ "\"." << endl;
+ return 1;
+ }
return 0;
}
+
+ static std::string& create_msg(std::string& s, int msg_num)
+ {
+ std::stringstream ss;
+ ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
+ ss << "_4567890123456789012345678901234567890";
+ ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
+ s.assign(ss.str());
+ return s;
+ }
};
int main(int argc, char** argv)
16 years, 9 months
rhmessaging commits: r938 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-21 15:12:19 -0400 (Fri, 21 Sep 2007)
New Revision: 938
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
- Implementation of restore (contains a hack on read, waiting for blocked read
and ** pointer read from kim to remove hack
- Fixes to Dequeue for AIO
- General bug fixes
- dbd still default store
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 16:41:22 UTC (rev 937)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 19:12:19 UTC (rev 938)
@@ -44,8 +44,6 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
-// cct delete this !!!!
-// bool hack = false;
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
@@ -257,10 +255,9 @@
TxnCtxt txn;
txn.begin(env);
try {
- //read all queues:
- recoverQueues(txn, registry, queues);
- //read all messages:
- recoverMessages(txn, registry, queues, prepared, messages);
+ //read all queues, calls recoversMessages
+ recoverQueues(txn, registry, queues, prepared, messages);
+
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
@@ -289,7 +286,8 @@
registry.recoveryComplete();
}
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& index)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
+prepared, message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -306,28 +304,32 @@
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
- const char* queueName = queue->getName().c_str();
-
if (usingJrnl())
{
-
+ const char* queueName = queue->getName().c_str();
journal::jcntl* jQueue = new journal::jcntl(queueName, getJrnlDir(queueName), string("JournalData"));
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- jQueue->recover();
- } catch (journal::jexception& e) {
- std::string s;
+ try
+ {
+ jQueue->recover(); // start recovery
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recovered(); // start journal.
+ } catch (journal::jexception& e) {
+ std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
}
+ //read all messages: done on a per queue basis if using Journal
}
-
- index[key.id] = queue;
+ queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
queueIdSequence.reset(maxQueueId + 1);
+
+ if (!usingJrnl()) //read all messages:
+ recoverMessages(txn, registry, queue_index, prepared, messages);
+
}
@@ -385,6 +387,98 @@
}
}
+// async IO version.
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+{
+
+ size_t preambleLength = sizeof(u_int32_t)/*header size*/;
+ u_int64_t maxMessageId(1);
+
+ journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
+ journal::data_tok dtokp;
+ size_t readSize = 0;
+// char** buff = 0;
+ unsigned aio_sleep_cnt = 0;
+ unsigned msg_count=0;
+ bool read = true;
+
+// hack...
+ char buff [1024]; size_t buffSize = 1024;
+
+
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ // read the message from the Journal.
+ while (read) {
+
+std:: cout << "loop -- uses fixed size -> FIX <-" << std::endl;
+
+// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
+ rhm::journal::iores res;
+ try {
+ res = jc->read_data(&buff, buffSize, &dtokp);
+ } catch (rhm::journal::jexception& e) {
+ std::cout << "recover read" << e << std::endl;
+ std::string str;
+ THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
+ }
+ readSize = dtokp.dsize();
+ assert(readSize < buffSize); /// fail safe for hack...
+
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ char* data = buff;
+ unsigned headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+
+ RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ msg->setPersistenceId(dtokp.rid());
+
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+
+ if (msg->loadContent(contentSize)) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid())) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
+
+ if (dtokp.rid() > maxMessageId) {
+ maxMessageId = dtokp.rid();
+ }
+
+ dtokp.reset();
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ {
+ THROW_STORE_EXCEPTION("Store error, disk time out on recover for:" + queue->getName());
+ }
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+ assert (jc->get_enq_cnt() == msg_count);
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+ messageIdSequence.reset(maxMessageId + 1);
+
+}
+
+// bdb version
void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery, queue_index& index,
txn_list& locked, message_index& prepared)
{
@@ -659,7 +753,7 @@
}
store(&queue, txn->get(), key, msg, newId);
- if (/*!hack || */ !usingJrnl()){
+ if (!usingJrnl()){
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
}
@@ -691,25 +785,21 @@
//buffer.flip();
+ journal::data_tok* dtokp = NULL;
try {
- if (/*hack &&*/ queue && usingJrnl()){
-
- if (queue){
- // cct TODO -- delete this in the callback...
- journal::data_tok* dtokp = new journal::data_tok();
- dtokp->setSourceMessage (&message);
+ if ( queue && usingJrnl()){
+ dtokp = new journal::data_tok;
+ // cct TODO -- delete this in the callback...
+ dtokp->setSourceMessage (&message);
dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal header (record-id)
unsigned aio_sleep_cnt = 0;
bool written = false;
while (!written)
{
- journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- char text[4];
- strcpy(text,"123");
-
- rhm::journal::iores eres = jc->enqueue_data(&text, 3, dtokp);
+ journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
+ rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -717,32 +807,36 @@
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " + queue->getName());
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+ delete dtokp;
+ THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " + queue->getName());
+ }
usleep(AIO_SLEEP_TIME);
jc->get_wr_events();
aio_sleep_cnt++;
break;
case rhm::journal::RHM_IORES_FULL:
- THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+ delete dtokp;
+ THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
break;
default:
- assert( "Unexpected msg state");
+ delete dtokp;
+ assert( "Store Error: Unexpected msg state");
}
- }
- }
+ }
- } else {
+ } else {
/// cct message db
if (newId){ // only store in Bd if first time message is stored
Dbt data(buff,size);
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
- }
- }
+ messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ }
+ }
}catch ( journal::jexception& e) {
std::string str;
- std::cout << "----" << e.to_string(str) << std::endl;
- THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+ std::cout << "-------------" << e << std::endl;
+ if (dtokp) delete dtokp;
+ THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
}catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
@@ -777,9 +871,13 @@
Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
- if (dequeue(txn->get(), key, value)) {
- msg.setPersistenceId(0);//clear id as we have now removed the message from the store
- msg.dequeueComplete(); // set dequeued for ack
+ if (usingJrnl()){
+ async_dequeue(ctxt, msg, queue);
+ }else{
+ if (dequeue(txn->get(), key, value)) {
+ msg.setPersistenceId(0);//clear id as we have now removed the message from the store
+ msg.dequeueComplete(); // set dequeued for ack
+ }
}
}
if (!ctxt) txn->commit();
@@ -793,6 +891,48 @@
}
}
+void BdbMessageStore::async_dequeue(TransactionContext* /*ctxt*/, PersistableMessage& msg, const PersistableQueue& queue)
+{
+ unsigned aio_sleep_cnt = 0;
+ bool written = false;
+ journal::data_tok* ddtokp = new journal::data_tok;
+ ddtokp->setSourceMessage (&msg);
+ ddtokp->set_rid(msg.getPersistenceId()); // message id to be dequeued
+ ddtokp->set_dequeue_rid(messageIdSequence.next());
+ ddtokp->set_wstate(journal::data_tok::ENQ);
+ journal::jcntl* jc = static_cast<journal::jcntl*>(queue.getExternalQueueStore());
+ while (!written)
+ {
+ rhm::journal::iores dres;
+ try {
+ dres = jc->dequeue_data(ddtokp);
+ } catch (rhm::journal::jexception& e) {
+ std::string str;
+ delete ddtokp;
+ THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
+ }
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+
+ written = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+ delete ddtokp;
+ THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for: " + queue.getName());
+ }
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ aio_sleep_cnt++;
+ break;
+ default:
+ delete ddtokp;
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+}
+
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
{
Cursor cursor;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-21 16:41:22 UTC (rev 937)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-21 19:12:19 UTC (rev 938)
@@ -74,9 +74,13 @@
IdSequence exchangeIdSequence;
IdSequence messageIdSequence;
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index);
+ void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
+ txn_list& locked, message_index& messages);
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
txn_list& locked, message_index& prepared);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked, message_index& prepared);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
@@ -91,6 +95,9 @@
bool newId);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ qpid::broker::PersistableMessage& msg,
+ const qpid::broker::PersistableQueue& queue);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
16 years, 9 months
rhmessaging commits: r937 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-09-21 12:41:22 -0400 (Fri, 21 Sep 2007)
New Revision: 937
Modified:
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Move to preview file for 0-10 (transitional has been deleted)
Remove channel_open/close with session equivalents
Use message_subscriber instead of message_get
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-09-21 11:18:17 UTC (rev 936)
+++ store/trunk/cpp/tests/persistence.py 2007-09-21 16:41:22 UTC (rev 937)
@@ -56,12 +56,9 @@
channel.queue_declare(queue="queue-b", durable=True, passive=True)
#check expected messages are there
- reply = channel.message_get(destination="incoming-gets", queue="queue-a")
- self.assertExpectedGetResult("Msg0001", "A_Message1")
+ self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
+ self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
- reply = channel.message_get(destination="incoming-gets", queue="queue-b")
- self.assertExpectedGetResult("Msg0002", "B_Message1").complete()
-
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
@@ -86,15 +83,10 @@
channel.tx_select()
#check expected messages are there
- reply = channel.message_get(destination="incoming-gets", queue="queue-a")
- self.assertExpectedGetResult("Msg0003", "AB_Message2")
+ self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
+ self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
+ self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2")
- reply = channel.message_get(destination="incoming-gets", queue="queue-b")
- self.assertExpectedGetResult("Msg0003", "AB_Message2")
-
- reply = channel.message_get(destination="incoming-gets", queue="queue-c")
- self.assertExpectedGetResult("Msg0003", "AB_Message2").complete()
-
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
self.assertEmptyQueue("queue-c")
@@ -141,12 +133,9 @@
channel.queue_declare(queue="queue-a", durable=True, passive=True)
channel.queue_declare(queue="queue-b", durable=True, passive=True)
- reply = channel.message_get(destination="incoming-gets", queue="queue-a", no_ack=True)
- self.assertExpectedGetResult("Msg0004", "A_Message3")
- reply = channel.message_get(destination="incoming-gets", queue="queue-a", no_ack=True)
- self.assertExpectedGetResult("Msg0005", "A_Message4")
- reply = channel.message_get(destination="incoming-gets", queue="queue-a", no_ack=True)
- self.assertExpectedGetResult("Msg0006", "A_Message5")
+ self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3")
+ self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4")
+ self.assertMessageOnQueue("queue-a", "Msg0006", "A_Message5")
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
@@ -158,7 +147,7 @@
except Closed, e:
self.assertChannelException(404, e.args[0])
self.channel = self.client.channel(2)
- self.channel.channel_open()
+ self.channel.session_open()
channel = self.channel
def phase5(self):
@@ -266,8 +255,11 @@
def txswap(self, src, dest, tx):
self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
- self.channel.message_get(destination="temp-swap", queue=src)
+ self.channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
+ self.channel.message_flow(destination="temp-swap", unit=0, value=1)
+ self.channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
msg = self.client.queue("temp-swap").get(timeout=1)
+ self.channel.message_cancel(destination="temp-swap")
self.channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id'], 'delivery_mode':2},
body=msg.content.body))
msg.complete();
@@ -277,8 +269,8 @@
self.assertEqual(0, self.channel.queue_query(queue=name).message_count)
def assertChannelException(self, expectedCode, message):
- self.assertEqual("channel", message.method.klass.name)
- self.assertEqual("close", message.method.name)
+ self.assertEqual("session", message.method.klass.name)
+ self.assertEqual("closed", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
@@ -296,11 +288,21 @@
return content
def assertExpectedGetResult(self, id, body):
- return self.assertExpectedContent(self.client.queue("incoming-gets").get(timeout=1), id, body)
+ return self.assertExpectedContent(self.client.queue("incoming-gets").get(timeout=1), id, body)
def assertEqual(self, expected, actual, msg=''):
if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
+ def assertMessageOnQueue(self, queue, id, body):
+ self.channel.message_subscribe(destination="incoming-gets", queue=queue, confirm_mode=1)
+ self.channel.message_flow(destination="incoming-gets", unit=0, value=1)
+ self.channel.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF)
+ msg = self.client.queue("incoming-gets").get(timeout=1)
+ self.assertExpectedContent(msg, id, body)
+ msg.complete()
+ self.channel.message_cancel(destination="incoming-gets")
+
+
def __init__(self):
self.setBroker("localhost")
self.errata = []
@@ -310,7 +312,7 @@
self.client = qpid.client.Client(self.host, self.port, qpid.spec.load(self.spec, *self.errata))
self.client.start({"LOGIN": self.user, "PASSWORD": self.password})
self.channel = self.client.channel(1)
- self.channel.channel_open()
+ self.channel.session_open()
def connect_with_retry(self, retry):
for r in range(1, retry + 1):
@@ -351,7 +353,7 @@
traceback.print_exc()
- self.channel.channel_close()
+ self.channel.session_close()
return True
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2007-09-21 11:18:17 UTC (rev 936)
+++ store/trunk/cpp/tests/system_test.sh 2007-09-21 16:41:22 UTC (rev 937)
@@ -43,10 +43,8 @@
. $abs_srcdir/setup
# Make sure $QPID_DIR contains what we need.
-xml_spec=$QPID_DIR/specs/amqp-transitional.0-10.xml
-spec_errata=$QPID_DIR/specs/amqp-errata.0-9.xml
-dtx_preview=$QPID_DIR/specs/amqp-dtx-preview.0-9.xml
-test -f $xml_spec -a -f $spec_errata -a -f $dtx_preview || {
+xml_spec=$QPID_DIR/specs/amqp.0-10-preview.xml
+test -f $xml_spec || {
echo "$xml_spec or $spec_errata or $dtx_preview not found: invalid \$QPID_DIR ?"; exit 1; }
export DB_HOME=dbdata
16 years, 9 months
rhmessaging commits: r936 - in store/branches/java/M2: java/bdbstore and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rupertlssmith
Date: 2007-09-21 07:18:17 -0400 (Fri, 21 Sep 2007)
New Revision: 936
Added:
store/branches/java/M2/java/bdbstore/etc/scripts/bdbbackuptest.sh
store/branches/java/M2/java/bdbstore/etc/scripts/bdbtest.sh
Removed:
store/branches/java/M2/java/bdbstore/bin/
store/branches/java/M2/java/bdbstore/etc/scripts/BDB-10.sh
store/branches/java/M2/java/bdbstore/etc/scripts/BDB-11.sh
store/branches/java/M2/java/bdbstore/etc/scripts/BDB-4.sh
store/branches/java/M2/java/bdbstore/etc/scripts/BDB-Mid.sh
store/branches/java/M2/java/bdbstore/etc/scripts/BDB.sh
store/branches/java/M2/java/bdbstore/etc/scripts/bdb-4/
Modified:
store/branches/java/M2/
Log:
Merged revisions 931 via svnmerge from
https://svn.jboss.org/repos/rhmessaging/store/branches/java/M2.1
........
r931 | rupertlssmith | 2007-09-21 10:55:59 +0100 (Fri, 21 Sep 2007) | 1 line
Removed old BDB tests.
........
Property changes on: store/branches/java/M2
___________________________________________________________________
Name: svnmerge-integrated
- /store/branches/java/M2.1:1-888,932-933
+ /store/branches/java/M2.1:1-888,931-933
Deleted: store/branches/java/M2/java/bdbstore/etc/scripts/BDB-10.sh
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/BDB-10.sh 2007-09-21 11:13:43 UTC (rev 935)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/BDB-10.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -1,13 +0,0 @@
-#!/bin/bash
-
-# Parse arguments taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-echo "Starting Backup Test Script"
-java -Dlog4j.configuration=backup-log4j.xml ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient numMessagesToAction=25 -fromdir "$QPID_WORK/test-store/" -todir "$QPID_WORK/bdb_backup/test-store/" ${ARGS}
Deleted: store/branches/java/M2/java/bdbstore/etc/scripts/BDB-11.sh
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/BDB-11.sh 2007-09-21 11:13:43 UTC (rev 935)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/BDB-11.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -1,12 +0,0 @@
-#!/bin/bash
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.PersistentTest ${ARGS}
Deleted: store/branches/java/M2/java/bdbstore/etc/scripts/BDB-4.sh
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/BDB-4.sh 2007-09-21 11:13:43 UTC (rev 935)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/BDB-4.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -1,39 +0,0 @@
-#!/bin/bash
-
-if [ -z QPID_HOME ] ; then
-
-echo "QPID_HOME must be set"
-exit 0
-fi
-
-if [ ! -e $QPID_HOME/bin/qpid-server ] ; then
-echo "QPID_HOME/bin must contatin qpid-server"
-exit 0
-fi
-
-if [ ! -e $QPID_HOME/bin/qpid-run ] ; then
-echo "QPID_HOME/bin must contatin qpid-run"
-exit 0
-fi
-
-
-configs=`pwd`
-
-pushd $QPID_HOME/bin/
-
-echo "Starting qpid server - device config"
-./qpid-server -c $configs/bdb-4/device.xml
-
-echo "Starting qpid server - filepath config"
-./qpid-server -c $configs/bdb-4/filepath.xml
-
-echo "Starting qpid server - none existent path config"
-./qpid-server -c $configs/bdb-4/noneexistantpath.xml
-
-echo "Starting qpid server - no permission config"
-./qpid-server -c $configs/bdb-4/nopermission.xml
-
-echo "Starting qpid server - Star in path config"
-./qpid-server -c $configs/bdb-4/starpath.xml
-
-popd
Deleted: store/branches/java/M2/java/bdbstore/etc/scripts/BDB-Mid.sh
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/BDB-Mid.sh 2007-09-21 11:13:43 UTC (rev 935)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/BDB-Mid.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -1,12 +0,0 @@
-#!/bin/bash
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.ping.PingDurableClient numMessages=55 -o $QPID_WORK/results ${ARGS}
Deleted: store/branches/java/M2/java/bdbstore/etc/scripts/BDB.sh
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/BDB.sh 2007-09-21 11:13:43 UTC (rev 935)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/BDB.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -1,12 +0,0 @@
-#!/bin/bash
-
-# Parse arguements taking all - prefixed args as JAVA_OPTS
-for arg in "$@"; do
- if [[ $arg == -java:* ]]; then
- JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
- else
- ARGS="${ARGS}$arg "
- fi
-done
-
-java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.ping.PingDurableClient -o $QPID_WORK/results ${ARGS}
Copied: store/branches/java/M2/java/bdbstore/etc/scripts/bdbbackuptest.sh (from rev 931, store/branches/java/M2.0.1/java/bdbstore/etc/scripts/bdbbackuptest.sh)
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/bdbbackuptest.sh (rev 0)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/bdbbackuptest.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS}
Copied: store/branches/java/M2/java/bdbstore/etc/scripts/bdbtest.sh (from rev 931, store/branches/java/M2.0.1/java/bdbstore/etc/scripts/bdbtest.sh)
===================================================================
--- store/branches/java/M2/java/bdbstore/etc/scripts/bdbtest.sh (rev 0)
+++ store/branches/java/M2/java/bdbstore/etc/scripts/bdbtest.sh 2007-09-21 11:18:17 UTC (rev 936)
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.ping.PingDurableClient -o $QPID_WORK/results ${ARGS}
16 years, 9 months
rhmessaging commits: r935 - store/branches/java/M2.
by rhmessaging-commits@lists.jboss.org
Author: rupertlssmith
Date: 2007-09-21 07:13:43 -0400 (Fri, 21 Sep 2007)
New Revision: 935
Modified:
store/branches/java/M2/
Log:
Recorded merge of revisions 932 via svnmerge from
https://svn.jboss.org/repos/rhmessaging/store/branches/java/M2.1
........
r932 | rupertlssmith | 2007-09-21 11:37:28 +0100 (Fri, 21 Sep 2007) | 1 line
Renamed M2.0.1 branch to M2.1 to bring it in line with Apache branch naming.
........
Property changes on: store/branches/java/M2
___________________________________________________________________
Name: svnmerge-integrated
- /store/branches/java/M2.1:1-888,933
+ /store/branches/java/M2.1:1-888,932-933
16 years, 9 months