[rhmessaging-commits] rhmessaging commits: r1477 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Dec 13 12:19:52 EST 2007


Author: kpvdr
Date: 2007-12-13 12:19:52 -0500 (Thu, 13 Dec 2007)
New Revision: 1477

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
Log:
Code tidy-up: separation of qpid dependencies from class jcntl by moving them to subclass JournalImpl.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-13 17:19:52 UTC (rev 1477)
@@ -296,3 +296,62 @@
     return r;
 }
 
+void
+JournalImpl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+    JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- this list needs to be mutexed...???
+    std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_wr_cmpl_dtok_list.begin(),
+            jip->_aio_wr_cmpl_dtok_list.end());
+	    
+    jip->_aio_wr_cmpl_dtok_list.clear();
+    for (u_int32_t i=0; i<num_dtoks; i++)
+    {
+        data_tok*& dtokp = this_dtok_list.front();
+		if (!journal->is_stopped() && dtokp->getSourceMessage())
+		{
+			switch (dtokp->wstate())
+			{
+ 				case data_tok::ENQ:
+     	         	dtokp->getSourceMessage()->enqueueComplete();
+ 					break;
+				case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+     	        	dtokp->getSourceMessage()->dequeueComplete();
+		     		if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
+		         		dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+					break;
+				default:
+					;
+			}
+		}
+		dtokp->release();
+        this_dtok_list.pop_front();
+    }
+}
+
+void
+JournalImpl::aio_rd_callback(jcntl*  journal, u_int32_t num_dtoks)
+{
+    JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- can we get rid of the copy???
+    std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_rd_cmpl_dtok_list.begin(),
+            jip->_aio_rd_cmpl_dtok_list.end());
+    jip->_aio_rd_cmpl_dtok_list.clear();
+    for (u_int32_t i=0; i<num_dtoks; i++)
+    {
+        data_tok*& dtokp = this_dtok_list.front();
+		if (!journal->is_stopped() && dtokp->getSourceMessage())
+		{
+        	if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+         	{
+                // cct call the recovery manager. / lazyload.. 
+        	}
+		}
+	    dtokp->release();
+        this_dtok_list.pop_front();
+    }
+    
+}
+

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-12-13 17:19:52 UTC (rev 1477)
@@ -81,6 +81,9 @@
             journal::data_tok _dtok;
             bool _external;
 
+            std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
+            std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
+
         public:
             JournalImpl(const std::string& journalId,
                         const std::string& journalDirectory,
@@ -91,14 +94,18 @@
                         const qpid::sys::Duration flushTimeout);
             virtual ~JournalImpl();
 
+            inline void initialize() {
+	            jcntl::initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
+                        &aio_wr_callback );
+	        } 
+
             void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
 			        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id);
 
-            void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
-                    u_int64_t& highest_rid, u_int64_t queue_id)
-            {
+            inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                    u_int64_t& highest_rid, u_int64_t queue_id) {
                 recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
                     &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
             }
@@ -144,6 +151,8 @@
 
         private:
             const journal::iores handleInactivityTimer(const journal::iores r);
+            static void  aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+            static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
         }; // class JournalImpl
 
     } // namespace bdbstore

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-13 17:19:52 UTC (rev 1477)
@@ -41,7 +41,6 @@
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jinf.hpp>
 #include <sstream>
-#include <qpid/broker/PersistableMessage.h>
 #include <unistd.h>
 
 namespace rhm
@@ -738,64 +737,5 @@
     }
 }
 
-
-void
-jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
-{
-//kpvdr TODO -- this list needs to be mutexed...???
-    std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
-            journal->_aio_wr_cmpl_dtok_list.end());
-	    
-    journal->_aio_wr_cmpl_dtok_list.clear();
-    for (u_int32_t i=0; i<num_dtoks; i++)
-    {
-        data_tok*& dtokp = this_dtok_list.front();
-		if (!journal->is_stopped() && dtokp->getSourceMessage())
-		{
-			switch (dtokp->wstate())
-			{
- 				case data_tok::ENQ:
-     	         	dtokp->getSourceMessage()->enqueueComplete();
- 					break;
-				case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
-     	        	dtokp->getSourceMessage()->dequeueComplete();
-		     		if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
-		         		dtokp->getSourceMessage()->setPersistenceId(0);
-*/
-					break;
-				default:
-					;
-			}
-		}
-		dtokp->release();
-        this_dtok_list.pop_front();
-    }
-}
-
-void
-jcntl::aio_rd_callback(jcntl*  journal, u_int32_t num_dtoks)
-{
-
-//kpvdr TODO -- can we get rid of the copy???
-    std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
-            journal->_aio_rd_cmpl_dtok_list.end());
-    journal->_aio_rd_cmpl_dtok_list.clear();
-    for (u_int32_t i=0; i<num_dtoks; i++)
-    {
-        data_tok*& dtokp = this_dtok_list.front();
-		if (!journal->is_stopped() && dtokp->getSourceMessage())
-		{
-        	if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
-         	{
-                // cct call the recovery manager. / lazyload.. 
-        	}
-		}
-	    dtokp->release();
-        this_dtok_list.pop_front();
-    }
-    
-}
-
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-13 17:19:52 UTC (rev 1477)
@@ -49,7 +49,6 @@
 #include <jrnl/slock.hpp>
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
-#include <qpid/broker/PersistableQueue.h>
 
 namespace rhm
 {
@@ -143,9 +142,6 @@
         rcvdat _rcvdat;             ///< Recovery data used for recovery
         pthread_mutex_t _mutex;     ///< Mutex for thread safety
 
-        std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
-        std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
-
     public:
          /**
          * \brief Journal constructor.
@@ -193,19 +189,6 @@
 			std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb);
 
         /**
-        * \brief Initialize using internal default callbacks and data_tok lists.
-        *
-        * TODO: Move to JournalImpl later
-        *
-        * \exception TODO
-        */
-        void initialize()
-        {
-	        initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback );
-	    } 
-
-        /**
         * /brief Initialize journal by recovering state from previously written journal.
         *
         * Initialize journal by recovering state from previously written journal. The journal files
@@ -235,19 +218,6 @@
             const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
 
         /**
-        * \brief Recover using internal default callbacks and data_tok lists.
-        *
-        * TODO: Move to JournalImpl later
-        *
-        * \exception TODO
-        */
-        void recover(const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
-        {
-            recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback, prep_txn_list, highest_rid);
-        }
-
-        /**
         * \brief Notification to the journal that recovery is complete and that normal operation
         *     may resume.
         *
@@ -656,16 +626,6 @@
                 std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
-	    /**
-	    * Intenal callback write
-	    */
-	    static void  aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
-
-	    /**
-	    * Intenal callback write
-	    */
-	    static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
     };
 
 } // namespace journal

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-12-13 17:19:52 UTC (rev 1477)
@@ -30,9 +30,6 @@
 */
 
 #include "JournalSystemTests.hpp"
-//#include "msg_producer.hpp"
-//#include "msg_consumer.hpp"
-#include <vector>
 
 #define NUM_MSGS 5
 #define MAX_AIO_SLEEPS 500
@@ -67,7 +64,7 @@
     {
         char* test_name = "InitializationTest";
         rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-        jc.initialize();
+        jrnl_init(&jc);
     }
     catch (const rhm::journal::jexception& e)
     {
@@ -87,15 +84,15 @@
         char* test_name = "EmptyRecoverTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             jc.recover_complete();
         }
     }
@@ -114,7 +111,7 @@
     {
         char* test_name = "EnqueueTest";
         rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-        jc.initialize();
+        jrnl_init(&jc);
 
         // Non-txn
         for (int m=0; m<NUM_MSGS; m++)
@@ -145,13 +142,13 @@
         char* test_name = "RecoverReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -165,7 +162,7 @@
         test_name = "TxnRecoverReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             create_xid(xid, 1, XID_SIZE);
             txn_list.push_back(xid);
             for (int m=0; m<NUM_MSGS; m++)
@@ -174,7 +171,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -203,13 +200,13 @@
         char* test_name = "RecoveredReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -231,7 +228,7 @@
         test_name = "TxnRecoveredReadTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             create_xid(xid, 2, XID_SIZE);
             txn_list.push_back(xid);
             for (int m=0; m<NUM_MSGS; m++)
@@ -240,7 +237,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -277,13 +274,13 @@
         char* test_name = "RecoveredDequeueTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -307,7 +304,7 @@
         test_name = "TxnRecoveredDequeueTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             create_xid(xid, 3, XID_SIZE);
             txn_list.push_back(xid);
             for (int m=0; m<NUM_MSGS; m++)
@@ -316,7 +313,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             for (int m=0; m<NUM_MSGS; m++)
             {
                 read_msg(&jc);
@@ -355,7 +352,7 @@
         char* test_name = "FlagsRecoverdTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             // Transient msgs - should not recover
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
@@ -371,7 +368,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             // Recover non-transient msgs
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
@@ -424,7 +421,7 @@
         test_name = "TxnFlagsRecoverdTest";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             create_xid(xid, 4, XID_SIZE);
             txn_list.push_back(xid);
             // Transient msgs - should not recover
@@ -443,7 +440,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
             // Recover non-transient msgs
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {
@@ -511,7 +508,7 @@
         char* test_name = "ComplexRecoveryTest1";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             
             // Enqueue 2n, then dequeue first n msgs; check that only last n readable
             // rids: 0 to NUM_MSGS*2 - 1
@@ -531,7 +528,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
 
             // Check that only last n readable (as before)
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -568,7 +565,7 @@
         test_name = "TxnComplexRecoveryTest1";
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.initialize();
+            jrnl_init(&jc);
             
             // Enqueue 2n, then dequeue first n msgs; check that only last n readable
             // rids: 0 to NUM_MSGS - 1
@@ -595,7 +592,7 @@
         }
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
-            jc.recover(txn_list, highest_rid);
+            jrnl_recover(&jc, txn_list, highest_rid);
 
             // Check that only last n readable (as before)
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -648,6 +645,19 @@
 // === Private helper functions ===
 
 void
+JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
+{
+    jc->initialize(&aioRdCmplList, NULL, &aioWrCmplList, NULL);
+}
+
+void
+JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
+        u_int64_t& highest_rid)
+{
+    jc->recover(&aioRdCmplList, NULL, &aioWrCmplList, NULL, txn_list, highest_rid);
+}
+
+void
 JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool transient)
 {
     rhm::journal::data_tok* dtp = new rhm::journal::data_tok;

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp	2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp	2007-12-13 17:19:52 UTC (rev 1477)
@@ -32,6 +32,7 @@
 
 #include "../test_plugin.h"
 #include <jrnl/jcntl.hpp>
+#include <vector>
 
 class JournalSystemTests : public CppUnit::TestCase  
 {
@@ -55,6 +56,8 @@
     size_t xidsize;
     bool transientFlag;
     bool externalFlag;
+    std::deque<rhm::journal::data_tok*> aioRdCmplList;
+    std::deque<rhm::journal::data_tok*> aioWrCmplList;
 
 public:
     void InstantiationTest();
@@ -68,6 +71,9 @@
     void ComplexRecoveryTest1();
 
 private:
+    void jrnl_init(rhm::journal::jcntl* jc);
+    void jrnl_recover(rhm::journal::jcntl* jc, std::vector<std::string> txn_list,
+            u_int64_t& highest_rid);
     void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient);
     void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient);
     void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,




More information about the rhmessaging-commits mailing list