[rhmessaging-commits] rhmessaging commits: r1187 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Oct 30 16:55:56 EDT 2007


Author: kpvdr
Date: 2007-10-30 16:55:56 -0400 (Tue, 30 Oct 2007)
New Revision: 1187

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/tests/persistence.py
   store/trunk/cpp/tests/system_test.sh
Log:
Added journal and bdb modes to system tests, both now pass.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -335,7 +335,8 @@
                   jQueue->recover(prepared, key.id); // start recovery
                   recoverMessages(txn, registry, queue, prepared, messages); 
 				  jQueue->recover_complete(); // start journal.
-	          } catch (journal::jexception& e) {
+	          } catch (const journal::jexception& e) {
+std::cout << e << std::flush;
 	              std::string s;
                   THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
               }
@@ -349,7 +350,6 @@
 	
 	if (!usingJrnl()) //read all messages: 
         recoverMessages(txn, registry, queue_index, prepared, messages);
-
 }
 
 

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -22,6 +22,7 @@
 */
 
 #include "JournalImpl.h"
+#include "jrnl/jerrno.hpp"
 
 using namespace rhm::bdbstore;
 
@@ -52,14 +53,20 @@
     // Populate PreparedTransaction lists from _tmap
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
             i != prep_tx_list.end(); i++) {
-        journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
-        assert(tdl.size()); // should never be empty
-        for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
-            if (tdl_itr->_enq_flag) { // enqueue op
-                i->enqueues->add(queue_id, tdl_itr->_rid);
-            } else { // dequeue op
-                i->dequeues->add(queue_id, tdl_itr->_drid);
+        try {
+            journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+            assert(tdl.size()); // should never be empty
+            for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+                if (tdl_itr->_enq_flag) { // enqueue op
+                    i->enqueues->add(queue_id, tdl_itr->_rid);
+                } else { // dequeue op
+                    i->dequeues->add(queue_id, tdl_itr->_drid);
+                }
             }
         }
+        catch (const journal::jexception& e) {
+            if (e.err_code() != journal::jerrno::JERR_MAP_NOTFOUND)
+                throw e;
+        }
     }
 }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-10-30 20:55:56 UTC (rev 1187)
@@ -90,7 +90,7 @@
 public:
 	
     TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0)  {
-		if (loggedtx) tid = "rhm-tid" + getCount();
+		if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
 	}
 	
 	/**

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -199,7 +199,7 @@
         const bool transient) throw (jexception)
 {
     check_wstatus("enqueue_tx_data_record");
-    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.c_str(), xid.size(),
+    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
             transient);
 }
 
@@ -237,21 +237,21 @@
 jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid) throw (jexception)
 {
     check_wstatus("dequeue_data");
-    return _wmgr.dequeue(dtokp, xid.c_str(), xid.size());
+    return _wmgr.dequeue(dtokp, xid.data(), xid.size());
 }
 
 const iores
 jcntl::txn_abort(data_tok* const dtokp, const std::string& xid) throw (jexception)
 {
     check_wstatus("txn_abort");
-    return _wmgr.abort(dtokp, xid.c_str(), xid.size());
+    return _wmgr.abort(dtokp, xid.data(), xid.size());
 }
 
 const iores
 jcntl::txn_commit(data_tok* const dtokp, const std::string& xid) throw (jexception)
 {
     check_wstatus("txn_commit");
-    return _wmgr.commit(dtokp, xid.c_str(), xid.size());
+    return _wmgr.commit(dtokp, xid.data(), xid.size());
 }
 
 const bool

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -215,7 +215,7 @@
 rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
         data_tok* dtokp) throw (jexception)
 {
-//std::cout << " rmgr::read()" << std::flush;
+//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << std::flush;
 
     iores res = pre_read_check(dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -281,34 +281,44 @@
                 bool is_enq = false;
                 try
                 {
-//std::cout << " ?" << _hdr._rid << std::flush;
+//std::cout << " rid=" << _hdr._rid << std::flush;
                     fid = _emap.get_fid(_hdr._rid);
-//std::cout << "-ok" << std::flush;
+//std::cout << ":ok" << std::flush;
                     is_enq = true;
                 }
                 catch (jexception& e)
                 {
+                    // Block read for transactionally locked record (only when not recovering)
                     if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
                         throw e;
+//if (e.err_code() == jerrno::JERR_MAP_LOCKED) std::cout << ":locked" << std::flush;
 						
-					// Ok, not in emap, now search tmap for recover
+					// (Recover mode only) Ok, not in emap - now search tmap, if present then read
 					if (_jc->is_read_only())
 					{
 						std::vector<std::string> xid_list;
 						_tmap.xid_list(xid_list);
-						for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end() && !is_enq; itr++)
+						for (std::vector<std::string>::iterator itr = xid_list.begin();
+                                itr != xid_list.end() && !is_enq; itr++)
 						{
+//std::cout << "[xid=" << *itr << ":" << std::flush;
 							txn_data_list tx_list = _tmap.get_tdata_list(*itr);
-							for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq; ditr++)
+							for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
+                                    ditr++)
 							{
-								if (ditr->_rid == _hdr._rid)
-									is_enq = true;
+//std::cout << (ditr->_enq_flag?"enq":"deq") << ",rid=" << ditr->_rid << std::flush;
+//if (!ditr->_enq_flag) std::cout << ",drid=" << ditr->_drid << std::flush;
+                                if (ditr->_enq_flag)
+                                    is_enq = ditr->_rid == _hdr._rid;
+                                else
+                                    is_enq = ditr->_drid == _hdr._rid;
 							}
+//std::cout << "]" << std::flush;
 						}
-					}
-//std::cout << "-nf" << std::flush;
+                    }
                 }
 #endif
+//std::cout << (is_enq?":enq":":not-enq") << std::flush;
                 if (is_enq) // ok, this record is enqueued, check it, then read it...
                 {
 //std::cout << "e" << std::flush;

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -93,7 +93,7 @@
     {
         std::stringstream ss;
         ss << std::hex << "xid=\"" << xid << "\"";
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "txn_data_list");
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_tdata_list");
     }
     return itr->second;
 }

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-30 20:55:56 UTC (rev 1187)
@@ -58,7 +58,8 @@
         u_int16_t _fid;     ///< File id, to be used when transferring to emap on commit
         bool _enq_flag;     ///< If true, enq op, otherwise deq op
         bool _aio_compl;    ///< Initially false, set to true when AIO returns
-        txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid, const bool enq_flag);
+        txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
+                const bool enq_flag);
     };
     typedef txn_data_struct txn_data;
     typedef std::vector<txn_data> txn_data_list;

Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/tests/persistence.py	2007-10-30 20:55:56 UTC (rev 1187)
@@ -251,7 +251,7 @@
 
 
     def xid(self, txid, branchqual = ''):
-        return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+        return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
         
     def txswap(self, src, dest, tx):
         self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)        

Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh	2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/tests/system_test.sh	2007-10-30 20:55:56 UTC (rev 1187)
@@ -33,6 +33,7 @@
     exit
 fi
 
+WORKING_DIR=/var/rhm
 QPIDD=$QPID_DIR/cpp/src/qpidd
      
 if test "$VERBOSE" = yes; then
@@ -58,19 +59,36 @@
 
 fail=0
 
-for p in `seq 1 6`; do
-    log="$abs_srcdir/vg-log.$p"
-    $vg $QPIDD -s $LIBBDBSTORE >> "$abs_srcdir/qpid.log" 2> $log & pid=$!
-    sleep 5
+let sync=0
+JRNLFLAGS=''
+while ((sync <= 1)); do
+    echo
+    echo '*** FIXME: Journal cannot be started when previous mode was BDB and database exists.'
+    rm -rf $WORKING_DIR/*
+    if ((sync == 1)); then
+        JRNLFLAGS='--store-async yes'
+        mode='jrnl'
+        echo 'Journal (AIO) persistence...'
+    else
+        mode='bdb'
+        echo 'BDB persistence...'
+    fi
+    for p in `seq 1 6`; do
+        log="$abs_srcdir/vg-log.$mode.$p"
+        echo "$vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS"
+        $vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log" 2> $log & pid=$!
+        sleep 5
 
-    echo phase $p...
-    pwd
-    python "$abs_srcdir/persistence.py" -s "$xml_spec" -p $p -r 3 || fail=1
-    sleep 1
-    kill -SIGINT $pid || { echo process already died; cat qpid.log ; fail=1; }
-    wait $pid #temporarily disable failure until broker can shutdown cleanly|| fail=1
+        echo phase $p...
+        pwd
+        python "$abs_srcdir/persistence.py" -s "$xml_spec" -p $p -r 3 || fail=1
+        sleep 1
+        kill -SIGINT $pid || { echo process already died; cat qpid.log ; fail=1; }
+        wait $pid #temporarily disable failure until broker can shutdown cleanly|| fail=1
 
-    vg_check $log || fail=1
+        vg_check $log || fail=1
+    done
+    ((sync++))
 done
 pid=0
 




More information about the rhmessaging-commits mailing list