[rhmessaging-commits] rhmessaging commits: r1349 - in store/trunk/cpp: lib and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 21 09:07:18 EST 2007


Author: kpvdr
Date: 2007-11-21 09:07:18 -0500 (Wed, 21 Nov 2007)
New Revision: 1349

Modified:
   store/trunk/cpp/docs/
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/tests/jrnl/
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Fix to correctly set the messageId after a resore based on the highest RID found in the journal.



Property changes on: store/trunk/cpp/docs
___________________________________________________________________
Name: svn:ignore
   - Makefile.in

   + Makefile.in
Makefile
html
man
latex


Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -384,8 +384,10 @@
 	
 	          try
 	          {
-                  jQueue->recover(prepared, key.id); // start recovery
+                  u_int64_t highestRid = 0;
+                  jQueue->recover(prepared, highestRid, key.id); // start recovery
                   recoverMessages(txn, registry, queue, prepared, messages); 
+                  messageIdSequence.reset(highestRid + 1);
 				  jQueue->recover_complete(); // start journal.
 	          } catch (const journal::jexception& e) {
                  THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -463,7 +465,6 @@
 {
 
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
-    u_int64_t maxMessageId(1);
  
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
 	DataTokenImpl dtokp;
@@ -519,10 +520,6 @@
                          queue->recover(msg);
                     }
     
-                    if (dtokp.rid() > maxMessageId) {
-                        maxMessageId = dtokp.rid();
-                    }
-
     				dtokp.reset();
 	    			dtokp.set_wstate(DataTokenImpl::ENQ);
 					
@@ -549,7 +546,6 @@
 		THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
                 ": recoverMessages() failed: " + e.what());
 	}
-    messageIdSequence.reset(maxMessageId + 1);
 }
 
 RecoverableMessage::shared_ptr  BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery, 
@@ -939,6 +935,7 @@
 
             bool written = false;
             unsigned aio_sleep_cnt = 0;
+            unsigned busy_sleep_cnt = 0;
             while (!written)
             {
 	            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -962,13 +959,19 @@
                         if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
                             written = true;
                         aio_sleep_cnt = 0;
+                        busy_sleep_cnt = 0;
                         break;
                     case rhm::journal::RHM_IORES_AIO_WAIT:
                         if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                            THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                            THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
                         usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
                         jc->get_wr_events();
                         break;
+                   case rhm::journal::RHM_IORES_BUSY:
+                        if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+                            THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+                        usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+                        break;
                     case rhm::journal::RHM_IORES_FULL:
 // Temporary error msg till exception handler core problem solved...
 std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"" << std::endl << std::flush;
@@ -1095,13 +1098,13 @@
                  break;
              case rhm::journal::RHM_IORES_AIO_WAIT:
                  if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                     THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                     THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
                  usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                  jc->get_wr_events();
                  break;
              case rhm::journal::RHM_IORES_BUSY:
                  if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-                     THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                     THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
                  usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                  break;
              default:

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -71,8 +71,8 @@
 void
 JournalImpl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
         std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
-        boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t queue_id)
-        throw (jexception)
+        boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
+        u_int64_t queue_id) throw (jexception)
 {
     // Create list of prepared xids
     std::vector<std::string> prep_xid_list;
@@ -81,7 +81,7 @@
         prep_xid_list.push_back(i->xid);
     }
 
-    jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+    jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list, highest_rid);
         
     // Populate PreparedTransaction lists from _tmap
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-11-21 14:07:18 UTC (rev 1349)
@@ -92,13 +92,13 @@
             void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
 			        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
-                    u_int64_t queue_id) throw (journal::jexception);
+                    u_int64_t& highest_rid, u_int64_t queue_id) throw (journal::jexception);
 
             void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
-                    u_int64_t queue_id) throw (journal::jexception)
+                    u_int64_t& highest_rid, u_int64_t queue_id) throw (journal::jexception)
             {
                 recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback, prep_tx_list, queue_id);
+                    &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
             }
             
             // Temporary fn to read and save last msg read from journal so it can be assigned

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -127,7 +127,8 @@
 
 void
 jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
-        const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list) throw (jexception)
+        const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+        throw (jexception)
 {
     // Verify journal dir and journal files
     _jdir.verify_dir();
@@ -135,6 +136,9 @@
     _emap.clear();
     _tmap.clear();
     rcvr_janalyze(_rcvdat, prep_txn_list);
+    highest_rid = _rcvdat._h_rid;
+    if (_rcvdat._full)
+        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
 
 // Debug info, but may be useful to print with a flag
 //_rcvdat.print();
@@ -174,7 +178,7 @@
 jcntl::recover_complete() throw (jexception)
 {
     if (!_readonly_flag)
-        throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recovered");
+        throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
     for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
         _datafh[i]->reset(&_rcvdat);
     _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
@@ -408,6 +412,12 @@
         u_int16_t fid = rd._ffid;
         std::ifstream ifs;
         while (rcvr_get_next_record(fid, &ifs, rd));
+        
+        // Check for journal full condition
+        u_int16_t next_wr_fid = (rd._lfid + 1) % JRNL_NUM_FILES;
+        if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
+            rd._full = true;
+        
 		std::vector<std::string> xid_list;
 		_tmap.xid_list(xid_list);
 		for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
@@ -492,8 +502,9 @@
                 {
                     try
                     {
-                        _emap.get_remove_fid(dr.deq_rid());
-                        rd._enq_cnt_list[fid]--;
+                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+//std::cout << enq_fid;
+                        rd._enq_cnt_list[enq_fid]--;
                     }
                     catch (const jexception& e)
                     {
@@ -533,7 +544,7 @@
                             throw e;
                     }
                     if (itr->_enq_flag)
-                        rd._enq_cnt_list[fid]--;
+                        rd._enq_cnt_list[itr->_fid]--;
                 }
                 free(xidp);
                 if (rd._h_rid < h._rid)
@@ -579,7 +590,7 @@
             }
         break;
         case 0:
-//std::cout << " 0";
+//std::cout << " 0 ";
             rd._lfid = fid;
             rd._eo = ifsp->tellg();
             return false;
@@ -668,8 +679,8 @@
 					;
 			}
 		}
+		intrusive_ptr_release(dtokp);
         this_dtok_list.pop_front();
-		intrusive_ptr_release(dtokp);
     }
 }
 
@@ -691,8 +702,8 @@
                 // cct call the recovery manager. / lazyload.. 
         	}
 		}
+	    intrusive_ptr_release( dtokp);
         this_dtok_list.pop_front();
-	    intrusive_ptr_release( dtokp);
     }
     
 }

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -221,12 +221,13 @@
         *     AIO operations.
         * \param wr_cb Function pointer to callback function for write operations. May be NULL.
         * \param prep_txn_list
+        * \param highest_rid Returns the highest rid found in the journal during recover
         *
         * \exception TODO
         */
         void recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
 			std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb,
-            const std::vector<std::string>& prep_txn_list) throw (jexception);
+            const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid) throw (jexception);
 
         /**
         * \brief Recover using internal default callbacks and data_tok lists.
@@ -235,10 +236,11 @@
         *
         * \exception TODO
         */
-        void recover(const std::vector<std::string>& prep_txn_list) throw (jexception)
+        void recover(const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+                throw (jexception)
         {
             recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback, prep_txn_list);
+                    &aio_wr_callback, prep_txn_list, highest_rid);
         }
 
         /**

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -48,6 +48,7 @@
             u_int16_t _lfid;    ///< Last file id
             size_t _eo;         ///< End offset (first byte past last record)
             u_int64_t _h_rid;   ///< Highest rid found
+            bool _full;         ///< Journal is full
             std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
 
             rcvdat():
@@ -57,6 +58,7 @@
                     _lfid(0),
                     _eo(0),
                     _h_rid(0),
+                    _full(false),
                     _enq_cnt_list(JRNL_NUM_FILES, 0)
             {}
 
@@ -68,6 +70,7 @@
                 _lfid=0;
                 _eo=0;
                 _h_rid=0;
+                _full = false;
                 for (unsigned f=0; f<_enq_cnt_list.size(); f++)
                     _enq_cnt_list[f] = 0;
             }
@@ -84,6 +87,7 @@
                 std::cout << "  End offset (_eo) = 0x" << std::hex << _eo << std::dec << " ("  <<
                         (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
                 std::cout << "  Highest rid (_h_rid) = " << _h_rid << std::endl;
+                std::cout << "  Journal full (_full) = " << (_full ? "TRUE" : "FALSE") << std::endl;
                 std::cout << "  Enqueued records (txn & non-txn):" << std::endl;
                 for (unsigned i=0; i<_enq_cnt_list.size(); i++)
                     std::cout << "    File " << i << ": " << _enq_cnt_list[i] << std::endl;


Property changes on: store/trunk/cpp/tests/jrnl
___________________________________________________________________
Name: svn:ignore
   - .deps
.libs
Makefile
Makefile.in
jtest
unit_test_jerrno
unit_test_jexception
unit_test_jinf

   + .deps
.libs
Makefile
Makefile.in
jtest
unit_test_enq_map
unit_test_jerrno
unit_test_jexception
unit_test_jdir
unit_test_file_hdr
unit_test_jinf
unit_test_txn_map


Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-21 04:11:54 UTC (rev 1348)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-21 14:07:18 UTC (rev 1349)
@@ -81,6 +81,7 @@
     try
     {
         vector<string> txn_list;
+        u_int64_t highest_rid;
         char* test_name = "EmptyRecoverTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -88,11 +89,11 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             jc.recover_complete();
         }
     }
@@ -135,6 +136,7 @@
 JournalSystemTests::RecoverReadTest()
 {
     vector<string> txn_list;
+    u_int64_t highest_rid;
     try
     {
         // Non-txn
@@ -147,7 +149,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -170,7 +172,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -192,6 +194,7 @@
 JournalSystemTests::RecoveredReadTest()
 {
     vector<string> txn_list;
+    u_int64_t highest_rid;
     try
     {
         // Non-txn
@@ -204,7 +207,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -235,7 +238,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -265,6 +268,7 @@
 JournalSystemTests::RecoveredDequeueTest()
 {
     vector<string> txn_list;
+    u_int64_t highest_rid;
     try
     {
         // Non-txn
@@ -277,7 +281,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -310,7 +314,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -342,6 +346,7 @@
 JournalSystemTests::HeaderFlagsTest()
 {
     vector<string> txn_list;
+    u_int64_t highest_rid;
     try
     {
         // Non-txn
@@ -364,7 +369,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             // Recover non-transient msgs
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
@@ -436,7 +441,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
             // Recover non-transient msgs
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
@@ -497,6 +502,7 @@
 JournalSystemTests::ComplexRecoveryTest1()
 {
     vector<string> txn_list;
+    u_int64_t highest_rid;
     try
     {
         // Non-txn
@@ -523,7 +529,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
 
             // Check that only last n readable (as before)
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -587,7 +593,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
-            jc.recover(txn_list);
+            jc.recover(txn_list, highest_rid);
 
             // Check that only last n readable (as before)
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)




More information about the rhmessaging-commits mailing list