[rhmessaging-commits] rhmessaging commits: r1495 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Dec 14 14:41:15 EST 2007


Author: kpvdr
Date: 2007-12-14 14:41:15 -0500 (Fri, 14 Dec 2007)
New Revision: 1495

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/StoreException.h
Log:
Fix for BZ423981, but needs testing. Added monitor to JournalImpl.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-14 19:41:15 UTC (rev 1495)
@@ -33,7 +33,6 @@
 #include "StringDbt.h"
 #include "JournalImpl.h"
 #include "DataTokenImpl.h"
-#include <boost/intrusive_ptr.hpp>
 
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -996,60 +995,75 @@
 	        dtokp->setSourceMessage(message);
             dtokp->set_external_rid(true);
 	        dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
-            bool written = false;
-            unsigned aio_sleep_cnt = 0;
-            unsigned busy_sleep_cnt = 0;
-            while (!written)
-            {
-                JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-                rhm::journal::iores eres;
-                if (txn->getXid().empty()){
-                    if (message->isContentReleased()){
-                        eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
-                    }else {
-                        eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
-                    }
-                }else {
-                    if (message->isContentReleased()){
-                        eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
-                    } else {
-                        eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
-                    }
+            
+            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+            if (txn->getXid().empty()){
+                if (message->isContentReleased()){
+                    jc->enqueue_extern_data_record(size, dtokp.get(), false);
+                } else {
+                    jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
                 }
-                switch (eres)
-                {
-                  case rhm::journal::RHM_IORES_SUCCESS:
-                    //std::cout << "." << std::flush;
-                    if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
-                        written = true;
-                    aio_sleep_cnt = 0;
-                    busy_sleep_cnt = 0;
-                    break;
-                  case rhm::journal::RHM_IORES_AIO_WAIT:
-                    //std::cout << "w" << std::flush;
-                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                        THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-                    usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
-                    jc->get_wr_events();
-                    break;
-                  case rhm::journal::RHM_IORES_BUSY:
-                    //std::cout << "b" << std::flush;
-                    if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-                        THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-                    usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-                    break;
-                  case rhm::journal::RHM_IORES_ENQCAPTHRESH:
-                    std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-                    THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
-                  case rhm::journal::RHM_IORES_FULL:
-                    std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-                    THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
-                    break;
-                  default:
-                    assert("Store Error: Unexpected msg state");
+            }else {
+                if (message->isContentReleased()){
+                    jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+                } else {
+                    jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
                 }
             }
+
+//             bool written = false;
+//             unsigned aio_sleep_cnt = 0;
+//             unsigned busy_sleep_cnt = 0;
+//             while (!written)
+//             {
+//                 JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+//                 rhm::journal::iores eres;
+//                 if (txn->getXid().empty()){
+//                     if (message->isContentReleased()){
+//                         eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+//                     }else {
+//                         eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+//                     }
+//                 }else {
+//                     if (message->isContentReleased()){
+//                         eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+//                     } else {
+//                         eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+//                     }
+//                 }
+//                 switch (eres)
+//                 {
+//                   case rhm::journal::RHM_IORES_SUCCESS:
+//                     //std::cout << "." << std::flush;
+//                     if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+//                         written = true;
+//                     aio_sleep_cnt = 0;
+//                     busy_sleep_cnt = 0;
+//                     break;
+//                   case rhm::journal::RHM_IORES_AIO_WAIT:
+//                     //std::cout << "w" << std::flush;
+//                     if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+//                         THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+//                     usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+//                     jc->get_wr_events();
+//                     break;
+//                   case rhm::journal::RHM_IORES_BUSY:
+//                     //std::cout << "b" << std::flush;
+//                     if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+//                         THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+//                     usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+//                     break;
+//                   case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+//                     std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+//                     THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
+//                   case rhm::journal::RHM_IORES_FULL:
+//                     std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+//                     THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
+//                     break;
+//                   default:
+//                     assert("Store Error: Unexpected msg state");
+//                 }
+//             }
 	 
         } else {
             /// cct message db
@@ -1127,7 +1141,6 @@
     intrusive_ptr<PersistableMessage>& msg,
     const PersistableQueue& queue)
 {
-    bool written = false;
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
     ddtokp->addRef();
     ddtokp->setSourceMessage(msg);
@@ -1135,56 +1148,66 @@
     ddtokp->set_rid(messageIdSequence.next()); 
     ddtokp->set_dequeue_rid(msg->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
-    JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
     string tid;
     if (ctxt){
         TxnCtxt* txn = check(ctxt);
         tid = txn->getXid();
     }
-
-    unsigned aio_sleep_cnt = 0;
-    unsigned busy_sleep_cnt = 0;
-    while (!written)
-    {
-        rhm::journal::iores dres;
-        try {
-            if (tid.empty()){
-                dres = jc->dequeue_data_record(ddtokp.get());
-            } else {
-                dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
-            }
-        } catch (const journal::jexception& e) { 
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+    try {
+        JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+        if (tid.empty()) {
+            jc->dequeue_data_record(ddtokp.get());
+        } else {
+            jc->dequeue_txn_data_record(ddtokp.get(), tid);
         }
-        switch (dres)
-        {
-          case rhm::journal::RHM_IORES_SUCCESS:
-            //std::cout << "." << std::flush;
-            aio_sleep_cnt = 0;
-            busy_sleep_cnt = 0;
-            written = true;
-            break;
-          case rhm::journal::RHM_IORES_AIO_WAIT:
-            //std::cout << "w" << std::flush;
-            if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-            usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-            jc->get_wr_events();
-            break;
-          case rhm::journal::RHM_IORES_BUSY:
-            //std::cout << "b" << std::flush;
-            if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-                THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-            usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-            break;
-         case rhm::journal::RHM_IORES_FULL:
-            std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
-            THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
-            break;
-          default:
-            assert("Store Error: Unexpected msg state");
-        }
+    } catch (const journal::jexception& e) { 
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
     }
+
+//     bool written = false;
+//     unsigned aio_sleep_cnt = 0;
+//     unsigned busy_sleep_cnt = 0;
+//     while (!written)
+//     {
+//         rhm::journal::iores dres;
+//         try {
+//             if (tid.empty()){
+//                 dres = jc->dequeue_data_record(ddtokp.get());
+//             } else {
+//                 dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+//             }
+//         } catch (const journal::jexception& e) { 
+//             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+//         }
+//         switch (dres)
+//         {
+//           case rhm::journal::RHM_IORES_SUCCESS:
+//             //std::cout << "." << std::flush;
+//             aio_sleep_cnt = 0;
+//             busy_sleep_cnt = 0;
+//             written = true;
+//             break;
+//           case rhm::journal::RHM_IORES_AIO_WAIT:
+//             //std::cout << "w" << std::flush;
+//             if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+//                 THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+//             usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+//             jc->get_wr_events();
+//             break;
+//           case rhm::journal::RHM_IORES_BUSY:
+//             //std::cout << "b" << std::flush;
+//             if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+//                 THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+//             usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+//             break;
+//          case rhm::journal::RHM_IORES_FULL:
+//             std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
+//             THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
+//             break;
+//           default:
+//             assert("Store Error: Unexpected msg state");
+//         }
+//     }
 }
 
 bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-14 19:41:15 UTC (rev 1495)
@@ -25,6 +25,7 @@
 
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
+#include "StoreException.h"
 #include <qpid/sys/Monitor.h>
 
 using namespace rhm::bdbstore;
@@ -49,6 +50,7 @@
                          getEventsTimerSetFlag(false),
                          writeActivityFlag(false),
                          flushTriggeredFlag(true),
+                         aioWait(false),
                          _xidp(0),
                          _datap(0),
                          _dlen(0),
@@ -168,69 +170,67 @@
     return true;
 }
 
-const iores
+void
 JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const bool transient)
 {
-//std::cout << " " << _jid << ":E" << std::flush;
-    return handleInactivityTimer(jcntl::enqueue_data_record(data_buff, tot_data_len,
-            this_data_len, dtokp, transient));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
+            transient)));
 }
 
-const iores
+void
 JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
         const bool transient)
 {
-//std::cout << " " << _jid << ":E-ext" << std::flush;
-     return handleInactivityTimer(jcntl::enqueue_extern_data_record(tot_data_len, dtokp,
-             transient));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
 }
 
-const iores
+void
 JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-        const size_t this_data_len, data_tok* dtokp, const std::string& xid,
-        const bool transient)
+        const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
 {
-//std::cout << " " << _jid << ":E-tx" << std::flush;
-    return handleInactivityTimer(jcntl::enqueue_txn_data_record(data_buff, tot_data_len,
-            this_data_len, dtokp, xid, transient));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
+            dtokp, xid, transient)));
 }
 
-const iores
+void
 JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
         const std::string& xid, const bool transient)
 {
-//std::cout << " " << _jid << ":E-tx-ext" << std::flush;
-     return handleInactivityTimer(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
-             transient));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
+            transient)));
 }
 
-const iores
+void
 JournalImpl::dequeue_data_record(data_tok* const dtokp)
 {
-//std::cout << " " << _jid << ":D" << std::flush;
-    return handleInactivityTimer(jcntl::dequeue_data_record(dtokp));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
 }
 
-const iores
+void
 JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
 {
-//std::cout << " " << _jid << ":D-tx" << std::flush;
-    return handleInactivityTimer(jcntl::dequeue_txn_data_record(dtokp, xid));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
 }
 
-const iores
+void
 JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
-//std::cout << " " << _jid << ":A-tx" << std::flush;
-    return handleInactivityTimer(jcntl::txn_abort(dtokp, xid));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
 }
 
-const iores
+void
 JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
 {
-//std::cout << " " << _jid << ":C-tx" << std::flush;
-    return handleInactivityTimer(jcntl::txn_commit(dtokp, xid));
+    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+    while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
 }
 
 void
@@ -289,11 +289,42 @@
     journalTimer.add(inactivityFireEventPtr);
 }
 
-const iores
-JournalImpl::handleInactivityTimer(const iores r)
+const bool
+JournalImpl::handleIoResult(const iores r)
 {
     writeActivityFlag = true;
-    return r;
+    switch (r)
+    {
+        case rhm::journal::RHM_IORES_SUCCESS:
+            return false;
+        case rhm::journal::RHM_IORES_AIO_WAIT:
+std::cout << "W " << std::flush;
+            aioWait = true;
+            writeLock.wait();
+            usleep(1000);
+            return true;
+        case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+            {
+                std::ostringstream oss;
+                oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
+                std::cerr << oss.str() << std::endl << std::flush;
+                THROW_STORE_FULL_EXCEPTION(oss.str());
+            }
+        case rhm::journal::RHM_IORES_FULL:
+            {
+                std::ostringstream oss;
+                oss << "Journal full on queue \"" << _jid << "\".";
+                std::cerr << oss.str() << std::endl << std::flush;
+                THROW_STORE_FULL_EXCEPTION(oss.str());
+            }
+        default:
+            {
+                std::ostringstream oss;
+                oss << "Unexpected I/O response (" << rhm::journal::iores_str(r) << ") on queue " << _jid << "\".";
+                std::cerr << oss.str() << std::endl << std::flush;
+                THROW_STORE_FULL_EXCEPTION(oss.str());
+            }
+    }
 }
 
 void
@@ -329,6 +360,7 @@
 		dtokp->release();
         this_dtok_list.pop_front();
     }
+    jip->notifyWriteMonitor();
 }
 
 void

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-12-14 19:41:15 UTC (rev 1495)
@@ -74,6 +74,9 @@
             bool writeActivityFlag;
             bool flushTriggeredFlag;
             qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
+
+            qpid::sys::Monitor writeLock;
+            bool aioWait;
             
             // temp local vars for loadMsgContent below
             void* _xidp;
@@ -118,29 +121,28 @@
                     size_t length);
 
             // Overrides for write inactivity timer
-            const journal::iores enqueue_data_record(const void* const data_buff,
-                    const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+            void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+                    const size_t this_data_len, journal::data_tok* dtokp,
                     const bool transient = false);
 
-            const journal::iores enqueue_extern_data_record(const size_t tot_data_len,
-                    journal::data_tok* dtokp, const bool transient = false);
+            void enqueue_extern_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
+                    const bool transient = false);
 
-            const journal::iores enqueue_txn_data_record(const void* const data_buff,
-                    const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+            void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+                    const size_t this_data_len, journal::data_tok* dtokp, const std::string& xid,
+                    const bool transient = false);
+
+            void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
                     const std::string& xid, const bool transient = false);
 
-            const journal::iores enqueue_extern_txn_data_record(const size_t tot_data_len,
-                    journal::data_tok* dtokp, const std::string& xid, const bool transient = false);
+            void dequeue_data_record(journal::data_tok* const dtokp);
 
-            const journal::iores dequeue_data_record(journal::data_tok* const dtokp);
+            void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
 
-            const journal::iores dequeue_txn_data_record(journal::data_tok* const dtokp,
-                    const std::string& xid);
+            void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
 
-            const journal::iores txn_abort(journal::data_tok* const dtokp, const std::string& xid);
+            void txn_commit(journal::data_tok* const dtokp, const std::string& xid);
 
-            const journal::iores txn_commit(journal::data_tok* const dtokp, const std::string& xid);
-
             void stop(bool block_till_aio_cmpl = false);
 
             // Overrides for get_events timer
@@ -149,9 +151,12 @@
             // TimerTask callback
             void getEventsFire();
             void flushFire();
+            
+            // Notify write monitor
+            inline void notifyWriteMonitor() { if (aioWait) { aioWait = false; writeLock.notify(); } }
 
         private:
-            const journal::iores handleInactivityTimer(const journal::iores r);
+            const bool handleIoResult(const journal::iores r);
             static void  aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
             static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
         }; // class JournalImpl

Modified: store/trunk/cpp/lib/StoreException.h
===================================================================
--- store/trunk/cpp/lib/StoreException.h	2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/StoreException.h	2007-12-14 19:41:15 UTC (rev 1495)
@@ -24,6 +24,9 @@
 #ifndef _StoreException_
 #define _StoreException_
 
+#include "IdDbt.h"
+#include <boost/format.hpp>
+
 namespace rhm{
 namespace bdbstore{
 




More information about the rhmessaging-commits mailing list