[rhmessaging-commits] rhmessaging commits: r1209 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Nov 2 09:30:43 EDT 2007


Author: kpvdr
Date: 2007-11-02 09:30:43 -0400 (Fri, 02 Nov 2007)
New Revision: 1209

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/rtest
Log:
Created timer in JournalImpl that will keep calling get_events until the number of outstanding AIO events is 0.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -44,6 +44,7 @@
 static const u_int8_t MESSAGE_MESSAGE = 1;
 static const u_int8_t BASIC_MESSAGE = 2;
 bool BdbMessageStore::useAsync;
+qpid::sys::Duration BdbMessageStore::defJournalTimeout(1000); // 1ms (timeout value is in nanoseconds)
 
 unsigned int TxnCtxt::count = 0;
 
@@ -148,7 +149,7 @@
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
     if (usingJrnl()) {
-        JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"));
+        JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), defJournalTimeout);
         queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 		try	{
 	     	// init will create the deque's for the init...
@@ -327,7 +328,7 @@
         if (usingJrnl())
         {
 	          const char* queueName = queue->getName().c_str();
-              JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"));
+              JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalTimeout);
               queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 	
 	          try
@@ -756,13 +757,14 @@
 
 void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
 {
+    if (!usingJrnl()) return;
 	checkInit();
     try {
 		JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
 		if (jc){
 		    jc->flush();
-			::usleep(10000);  /////////////// hack ----------- FIX!!
-			jc->get_wr_events();
+// 			::usleep(10000);  /////////////// hack ----------- FIX!!
+// 			jc->get_wr_events();
 		}
     }catch ( journal::jexception& e) {
        std::string str;

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-02 13:30:43 UTC (rev 1209)
@@ -34,6 +34,7 @@
 #include "TxnCtxt.h"
 #include <qpid/broker/MessageStore.h>
 #include <qpid/sys/Monitor.h>
+#include <qpid/sys/Time.h>
 #include <map>
 #include <set>
 #include <iostream>
@@ -75,6 +76,7 @@
 			std::string storeDir;
 			bool isInit;
 			const char* envPath;
+            static qpid::sys::Duration defJournalTimeout;
 
             void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
 	    			 txn_list& locked, message_index& messages);

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -23,17 +23,29 @@
 
 #include "JournalImpl.h"
 #include "jrnl/jerrno.hpp"
+#include <qpid/sys/Monitor.h>
 
 using namespace rhm::bdbstore;
+using namespace rhm::journal;
 
+qpid::broker::TimerA JournalImpl::journalGetEventsTimer;
+
+
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
-                         const std::string& journalBaseFilename):
-                         jcntl(journalId, journalDirectory, journalBaseFilename)
-{}
+                         const std::string& journalBaseFilename,
+                         const qpid::sys::Duration timeout):
+                         jcntl(journalId, journalDirectory, journalBaseFilename),
+                         timerSetFlag(false)
+{
+    fireEventPtr = new FireEvent(this, timeout);
+    journalGetEventsTimer.start();
+}
 
 JournalImpl::~JournalImpl()
-{}
+{
+    //fireEventPtr->cancel();
+}
 
 void
 JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
@@ -70,3 +82,36 @@
         }
     }
 }
+
+void
+JournalImpl::flush() throw (journal::jexception)
+{
+    jcntl::flush();
+    handleEventTimer((const iores)0);
+}
+
+void
+JournalImpl::fire()
+{
+    if (_wmgr.get_aio_evt_rem()) {
+        jcntl::get_wr_events();
+    }
+    timerSetFlag = false;
+    if (_wmgr.get_aio_evt_rem()) {
+        intrusive_ptr_add_ref(fireEventPtr.get());
+        journalGetEventsTimer.add(fireEventPtr);
+        timerSetFlag = true;
+    }
+}
+
+const iores
+JournalImpl::handleEventTimer(const iores res)
+{
+    if (_wmgr.get_aio_evt_rem() && !timerSetFlag) {
+        intrusive_ptr_add_ref(fireEventPtr.get());
+        journalGetEventsTimer.add(fireEventPtr);
+        timerSetFlag = true;
+    }
+    return res;
+}
+

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-11-02 13:30:43 UTC (rev 1209)
@@ -24,21 +24,35 @@
 #ifndef _JournalImpl_
 #define _JournalImpl_
 
+#include <set>
 #include "jrnl/jcntl.hpp"
 #include "jrnl/data_tok.hpp"
 #include "PreparedTransaction.h"
+#include <qpid/broker/Timer.h>
+#include <qpid/sys/Time.h>
 #include <boost/ptr_container/ptr_list.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 namespace rhm {
     namespace bdbstore {
 
-        class JournalImpl : public journal::jcntl
-        {
+        class FireEvent;
+
+        class JournalImpl : public journal::jcntl 
+        {            
+        private:
+            static qpid::broker::TimerA journalGetEventsTimer;
+            bool timerSetFlag;
+            qpid::broker::TimerTaskA::intrusive_ptr fireEventPtr;
+            
+
         public:
             JournalImpl(const std::string& journalId,
                         const std::string& journalDirectory,
-                        const std::string& journalBaseFilename);
+                        const std::string& journalBaseFilename,
+                        const qpid::sys::Duration timeout);
             ~JournalImpl();
+
             void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
 			        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
@@ -50,8 +64,30 @@
                 recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
                     &aio_wr_callback, prep_tx_list, queue_id);
             }
+
+            void flush() throw (journal::jexception);
+
+            // TimerTask callback
+            void fire();
+        
+        private:
+            const journal::iores handleEventTimer(const journal::iores res);
         };
+        
+        class FireEvent : public virtual qpid::broker::TimerTaskA
+        {
+            JournalImpl*    parent;
 
+        public:
+	        FireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+                qpid::broker::TimerTaskA(timeout), parent(p) {}
+            virtual ~FireEvent() {}
+            inline void fire() { if (parent) parent->fire(); unref(); }
+		    inline void cancel() { cancelled=true; parent=0; }
+        };
+
+      
+
         } // namespace bdbstore
     } // namespace rhm
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -64,7 +64,9 @@
     _wrfc(),
     _rmgr(this, _emap, _tmap, _rrfc),
     _wmgr(this, _emap, _tmap, _wrfc)
-{}
+{
+    pthread_mutex_init(&_mutex, NULL);
+}
 
 jcntl::~jcntl()
 {
@@ -78,6 +80,7 @@
                 ::delete _datafh[i];
         ::delete[] _datafh;
     }
+    pthread_mutex_destroy(&_mutex);
 }
 
 void
@@ -263,7 +266,20 @@
 const u_int32_t
 jcntl::get_wr_events() throw (jexception)
 {
-    return _wmgr.get_events(pmgr::UNUSED);
+    int ret = pthread_mutex_trylock(&_mutex);
+    if (ret)
+    {
+        if (ret != EBUSY)
+        {
+            std::stringstream ss;
+            ss << "pthread_mutex_trylock() returned " << errno << " (" << strerror(errno) << ")";
+            throw jexception(jerrno::JERR__PTHREAD, ss.str(), "jcntl", "get_wr_events");
+        }
+        return 0; // already locked, return immediately
+    }
+    u_int32_t res = _wmgr.get_events(pmgr::UNUSED);
+    pthread_mutex_unlock(&_mutex);
+    return res;
 }
 
 const u_int32_t

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -48,6 +48,7 @@
 #include <jrnl/rmgr.hpp>
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
+#include <pthread.h>
 #include <qpid/broker/PersistableQueue.h>
 
 namespace rhm
@@ -138,6 +139,7 @@
         rmgr _rmgr;                 ///< Read page manager which manages AIO
         wmgr _wmgr;                 ///< Write page manager which manages AIO
         rcvdat _rcvdat;             ///< Recovery data used for recovery
+        pthread_mutex_t _mutex;     ///< Mutex for tread safety
 
         std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
         std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
@@ -218,7 +220,7 @@
         * \param wr_dtokl deque for storing data tokens retruning from enqueue and dequeue (write)
         *     AIO operations.
         * \param wr_cb Function pointer to callback function for write operations. May be NULL.
-        * \param prep_tx_list
+        * \param prep_txn_list
         *
         * \exception TODO
         */

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -50,6 +50,7 @@
 const u_int32_t jerrno::JERR__AIO               = 0x0103;
 const u_int32_t jerrno::JERR__FILEIO            = 0x0104;
 const u_int32_t jerrno::JERR__RTCLOCK           = 0x0105;
+const u_int32_t jerrno::JERR__PTHREAD           = 0x0106;
 
 // class jcntl
 const u_int32_t jerrno::JERR_JCNTL_STOPPED      = 0x0200;
@@ -127,6 +128,7 @@
     _err_map[JERR__AIO] = std::string("JERR__AIO: AIO error.");
     _err_map[JERR__FILEIO] = std::string("JERR__FILEIO: File read or write failure.");
     _err_map[JERR__RTCLOCK] = std::string("JERR__RTCLOCK: Reading real-time clock failed.");
+    _err_map[JERR__PTHREAD] = std::string("JERR__PTHREAD: pthread failure.");
 
     // class jcntl
     _err_map[JERR_JCNTL_STOPPED] = std::string("JERR_JCNTL_STOPPED: Operation on stopped journal.");

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -67,6 +67,7 @@
         static const u_int32_t JERR__AIO;               ///< AIO failure
         static const u_int32_t JERR__FILEIO;            ///< File read or write failure
         static const u_int32_t JERR__RTCLOCK;           ///< Reading real-time clock failed
+        static const u_int32_t JERR__PTHREAD;           ///< pthread failure
 
         // class jcntl
         static const u_int32_t JERR_JCNTL_STOPPED;      ///< Operation on stopped journal

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-11-02 13:30:43 UTC (rev 1209)
@@ -107,7 +107,7 @@
         {
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -122,7 +122,7 @@
             CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
             delete jcp;
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -139,7 +139,7 @@
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
             jc.initialize();
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -155,7 +155,7 @@
             jcp->initialize();
             delete jcp;
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -186,7 +186,7 @@
                 jc.recover_complete();
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -219,7 +219,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -240,7 +240,7 @@
             for (int m=0; m<NUM_MSGS; m++)
                 enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -258,7 +258,7 @@
                 enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
             delete jcp;
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -281,7 +281,7 @@
                 enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
             txn_commit(&jc, xid);
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -300,7 +300,7 @@
             txn_commit(jcp, xid);
             delete jcp;
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -335,7 +335,7 @@
                 }
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -370,7 +370,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -408,7 +408,7 @@
                 }
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -444,7 +444,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -487,7 +487,7 @@
                 }
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -529,7 +529,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -575,7 +575,7 @@
                 }
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -618,7 +618,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -663,7 +663,7 @@
                     deq_msg(&jc, m);
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -707,7 +707,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -755,7 +755,7 @@
                     deq_msg(&jc, m);
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -800,7 +800,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -871,7 +871,7 @@
                     deq_msg(&jc, m);
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -941,7 +941,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;
@@ -1019,7 +1019,7 @@
                     deq_msg(&jc, m);
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             std::stringstream ss;
             ss << e;
@@ -1095,7 +1095,7 @@
                 delete jcp;
             }
         }
-        catch (rhm::journal::jexception& e)
+        catch (const rhm::journal::jexception& e)
         {
             if (jcp)
                 delete jcp;

Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest	2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/tests/jrnl/rtest	2007-11-02 13:30:43 UTC (rev 1209)
@@ -58,8 +58,8 @@
 RM_DIR="${RM} -rf"
 TEST_PROG="./jtest"
 CHK_PROG="./janalyze.py"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
 MAKE="make -f Makefile.rtest"
 
 




More information about the rhmessaging-commits mailing list