[rhmessaging-commits] rhmessaging commits: r957 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Oct 1 10:46:01 EDT 2007


Author: kpvdr
Date: 2007-10-01 10:46:01 -0400 (Mon, 01 Oct 2007)
New Revision: 957

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Added PreparedTransactionList to jcntl::recover() fns

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-01 14:46:01 UTC (rev 957)
@@ -304,7 +304,7 @@
 	
 	          try
 	          {
-	              jQueue->recover(); // start recovery
+                  jQueue->recover(prepared); // start recovery
                   recoverMessages(txn, registry, queue, prepared, messages); 
 				  jQueue->recover_complete(); // start journal.
 	          } catch (journal::jexception& e) {

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-01 14:46:01 UTC (rev 957)
@@ -95,9 +95,7 @@
     }
 
     _datafh = ::new lfh*[JRNL_NUM_FILES];
-    // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
-    // can be thrown during pointer initialization, and the clean() fn that will be
-    // called after an exception will attempt to free any non-null pointer.
+    // NULL the pointer array first because new() can throw exceptions
     ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
     for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
     {
@@ -123,20 +121,20 @@
 
 void
 jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb, std::deque<data_tok*>* wr_dtokl,
-        const aio_cb wr_cb) throw (jexception)
+        const aio_cb wr_cb, boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+        throw (jexception)
 {
+    // Create list of prepared xids
+    std::set<std::string> prep_xid_list;
+    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+            i != prep_tx_list.end(); i++);
+//        prep_xid_list.insert(i->??);
+    
     // Verify journal dir and journal files
     _jdir.verify_dir();
     _rcvdat.reset();
     _emap.clear();
     rcvr_janalyze(_rcvdat);
-//std::cout << "em=" << _emap.size() << "; rd=" << rd._ffid << ":0x" << std::hex << rd._fro << std::dec << "-" << rd._lfid << ":0x" << std::hex << rd._eo << std::dec << " ";
-//std::vector<u_int64_t> kv;
-//std::vector<u_int64_t>::iterator ki;
-//_emap.rid_list(kv);
-//std::cout << " rids=[";
-//for (ki=kv.begin(); ki<kv.end(); ki++) {if (ki!=kv.begin()) std::cout << ", "; std::cout << *ki;}
-//std::cout << "]" << std::flush;
 
     if (_datafh)
     {
@@ -147,9 +145,7 @@
     }
 
     _datafh = ::new lfh*[JRNL_NUM_FILES];
-    // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
-    // can be thrown during pointer initialization, and the clean() fn that will be
-    // called after an exception will attempt to free any non-null pointer.
+    // NULL the pointer array first because new() can throw exceptions
     ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
     for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
     {
@@ -162,7 +158,6 @@
     // NOTE: The write RFC must initialize first. This sets all the file handle object
     // (lfh) counters and pointers for both read and write, since write activity
     // constrains read activity (i.e. one can't read what has not yet been written).
-//    _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._lfid, _rcvdat._h_rid + 1);
     _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.initialize(rd_dtokl, rd_cb);

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-01 14:46:01 UTC (rev 957)
@@ -49,6 +49,8 @@
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
 #include <qpid/broker/PersistableQueue.h>
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
 
 namespace rhm
 {
@@ -221,7 +223,8 @@
         * \exception TODO
         */
         void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
-			std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+			std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
+            boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (jexception);
 
         /**
         * \brief Recover using internal default callbacks and data_tok lists.
@@ -230,10 +233,11 @@
         *
         * \exception TODO
         */
-        void recover() throw (jexception)
+        void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+                throw (jexception)
         {
             recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback );
+                    &aio_wr_callback, prep_tx_list);
         }
 
         /**
@@ -379,8 +383,6 @@
         /**
         * \brief Discard (skip) next record to be read without reading or retrieving it.
         *
-        * \param dtokp Pointer to data token which contains the details of the enqueue operation.
-        *
         * \exception TODO
         */
         const iores discard_data_record() throw (jexception);

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-01 14:46:01 UTC (rev 957)
@@ -32,6 +32,7 @@
 #ifndef rhm_journal_rcvdat_hpp
 #define rhm_journal_rcvdat_hpp
 
+#include <map>
 #include <vector>
 
 namespace rhm
@@ -41,6 +42,10 @@
 
         struct rcvdat
         {
+            typedef std::vector<u_int64_t> rid_list;
+            typedef std::pair<rid_list, rid_list> enq_deq_rid_list;
+            typedef std::map<std::string, enq_deq_rid_list> enq_deq_map;
+            
             bool _empty;        ///< Journal data files empty
             u_int16_t _ffid;    ///< First file id
             size_t _fro;        ///< First record offset in ffid
@@ -48,6 +53,8 @@
             size_t _eo;         ///< End offset (first byte past last record)
             u_int64_t _h_rid;   ///< Highest rid found
             std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+            enq_deq_map _edm;   ///< Map of enqueue and dequeue rids for each xid
+
             rcvdat():
                     _empty(true),
                     _ffid(0),
@@ -55,7 +62,8 @@
                     _lfid(0),
                     _eo(0),
                     _h_rid(0),
-                    _enq_cnt_list(JRNL_NUM_FILES)
+                    _enq_cnt_list(JRNL_NUM_FILES),
+                    _edm()
             {}
             void reset()
             {
@@ -66,6 +74,7 @@
                 _eo=0;
                 _h_rid=0;
                 _enq_cnt_list.clear();
+                _edm.clear();
             }
         };
 }

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-01 14:46:01 UTC (rev 957)
@@ -34,6 +34,9 @@
 #include "msg_consumer.hpp"
 #include "jtest.hpp"
 
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
+
 #define MAX_MSG_SIZE 127
 #define NUM_MSGS 5
 #define MAX_AIO_SLEEPS 500
@@ -155,6 +158,7 @@
 
     void EmptyRecoverTest()
     {
+        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
         //Stack
         char* test_name = "EmptyRecoverTest_Stack";
         try
@@ -165,11 +169,11 @@
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
                 jc.recover_complete();
             }
         }
@@ -194,14 +198,14 @@
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 delete jcp;
                 jcp = NULL;
             }
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 jcp->recover_complete();
                 delete jcp;
             }
@@ -257,6 +261,7 @@
 
     void RecoverReadTest()
     {
+        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
         //Stack
         char* test_name = "RecoverReadTest_Stack";
         try
@@ -269,7 +274,7 @@
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -298,7 +303,7 @@
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -317,6 +322,7 @@
 
     void RecoveredReadTest()
     {
+        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
         //Stack
         char* test_name = "RecoveredReadTest_Stack";
         try
@@ -329,7 +335,7 @@
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -362,7 +368,7 @@
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -385,6 +391,7 @@
 
     void RecoveredDequeueTest()
     {
+        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
         //Stack
         char* test_name = "RecoveredDequeueTest_Stack";
         try
@@ -397,7 +404,7 @@
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -432,7 +439,7 @@
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 for (int m=0; m<NUM_MSGS; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -457,6 +464,7 @@
 
     void ComplexRecoveryTest1()
     {
+        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
         //Stack
         char* test_name = "ComplexRecoveryTest1_Stack";
         try
@@ -477,7 +485,7 @@
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
-                jc.recover();
+                jc.recover(txn_list);
                 for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -530,7 +538,7 @@
             {
                 rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
-                jcp->recover();
+                jcp->recover(txn_list);
                 for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
                             create_msg(msg, m).compare(read_msg(jcp)) == 0);




More information about the rhmessaging-commits mailing list