[rhmessaging-commits] rhmessaging commits: r1118 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Oct 19 11:37:16 EDT 2007


Author: cctrieloff
Date: 2007-10-19 11:37:15 -0400 (Fri, 19 Oct 2007)
New Revision: 1118

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/data_tok.cpp
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
async mem clean up with intrusive_ptr

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-19 15:37:15 UTC (rev 1118)
@@ -30,6 +30,7 @@
 #include "BindingDbt.h"
 #include "IdPairDbt.h"
 #include "StringDbt.h"
+#include <boost/intrusive_ptr.hpp>
 
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -829,49 +830,39 @@
     Buffer buffer(buff,size);
     buffer.putLong(headerSize);
     message.encode(buffer);
-    //buffer.flip();
 
-   
-	DataTokenImpl* dtokp = NULL; 
     try {
 
      if ( queue && usingJrnl()){
-        dtokp = new DataTokenImpl;
- 	    // deleted this in the callback...
+        boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+		dtokp->ref();
 	    dtokp->setSourceMessage (&message);
 	    dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal header (record-id)
 
-//            unsigned aio_sleep_cnt = 0;
             bool written = false;
             while (!written)
             {
 	            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
 				rhm::journal::iores eres;
 				if (txn->getXid().empty()){
-					eres = jc->enqueue_data_record(buff, size, size, dtokp, false);
+					eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
 				}else {
-					eres = jc->enqueue_txn_data_record(buff, size, size, dtokp, txn->getXid(), false);
+					eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
 				}
                 switch (eres)
                 {
                     case rhm::journal::RHM_IORES_SUCCESS:
-                        if (dtokp->wstate() >= DataTokenImpl::ENQ_SUBM)
+                        if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
                             written = true;
                         break;
                     case rhm::journal::RHM_IORES_AIO_WAIT:
-/*                        if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
-						    delete dtokp;
-			                THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " + queue->getName());
-						}*/
                         usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
                         jc->get_wr_events();
                         break;
                     case rhm::journal::RHM_IORES_FULL:
-					    delete dtokp;
         		        THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
                         break;
                     default:
-					    delete dtokp;
                         assert( "Store Error: Unexpected msg state");
                 }
             }
@@ -886,7 +877,6 @@
     }catch ( journal::jexception& e) {
        std::string str;
 //	   std::cout << "-------------" << e << std::endl;
-	   if (dtokp) delete dtokp;
        THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
     }catch (DbException& e) {
         THROW_STORE_EXCEPTION_2("Error storing message", e);
@@ -948,7 +938,8 @@
 {
 //	unsigned aio_sleep_cnt = 0;
     bool written = false;
-	DataTokenImpl* ddtokp =  new DataTokenImpl;
+    boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
+	ddtokp->ref();
  	ddtokp->setSourceMessage (&msg);
 	ddtokp->set_rid(messageIdSequence.next()); 
 	ddtokp->set_dequeue_rid(msg.getPersistenceId());
@@ -965,13 +956,12 @@
          rhm::journal::iores dres;
          try {
 		      if (tid.empty()){
-			  	  dres = jc->dequeue_data_record(ddtokp);
+			  	  dres = jc->dequeue_data_record(ddtokp.get());
 			  } else {
-			  	  dres = jc->dequeue_txn_data_record(ddtokp, tid);
+			  	  dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
 			  }
          } catch (rhm::journal::jexception& e) { 
 		      std::string str;
-			  //delete ddtokp;
 			  THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
 	     }
          switch (dres)
@@ -981,15 +971,10 @@
                  written = true;
                  break;
              case rhm::journal::RHM_IORES_AIO_WAIT:
-/*                 if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
-				     delete ddtokp;
-			         THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for: " + queue.getName());
-			     } */
                  usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                  jc->get_wr_events();
                  break;
              default:
-			     delete ddtokp;
                  assert( "Store Error: Unexpected msg state");
          }
     }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-10-19 15:37:15 UTC (rev 1118)
@@ -32,6 +32,7 @@
 #include "JournalImpl.h"
 #include "DataTokenImpl.h"
 #include <boost/format.hpp>
+#include <boost/intrusive_ptr.hpp>
 
 namespace rhm{
 namespace bdbstore{
@@ -64,19 +65,19 @@
 		for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
 	   		JournalImpl* jc = static_cast<JournalImpl*>(*i);
 			if (jc && loggedtx) { /* if using journal */
-                DataTokenImpl* dtokp = new DataTokenImpl;
+        		boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+				dtokp->ref();
 				dtokp->set_rid(loggedtx->next());
 				try{
 					if (commit)
-						jc->txn_commit(dtokp, getXid());
+						jc->txn_commit(dtokp.get(), getXid());
 					else
 					{
-						jc->txn_abort(dtokp, getXid());
+						jc->txn_abort(dtokp.get(), getXid());
                 	}
 				} catch (rhm::journal::jexception& e) { 
 		      		std::string str;
 //std::cout << "Error commit" << e << std::endl;
-			  		delete dtokp;
 			  		THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
 	     		}
 			

Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-10-19 15:37:15 UTC (rev 1118)
@@ -33,6 +33,7 @@
 #include <jrnl/data_tok.hpp>
 
 #include <sstream>
+#include <iostream>
 #include <jrnl/jerrno.hpp>
 
 namespace rhm
@@ -40,11 +41,25 @@
 namespace journal
 {
 
+void intrusive_ptr_add_ref(data_tok* tok)
+{
+    tok->ref();
+}
+
+void intrusive_ptr_release(data_tok* tok)
+{
+    tok->unref();
+    if (tok->refcnt() == 0)
+        delete tok;
+}
+
+
 // Static members
 
 u_int64_t data_tok::_cnt = 0;
 
 data_tok::data_tok():
+	_ref_cnt(0),
     _wstate(NONE),
     _rstate(UNREAD),
     _dsize(0),

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-10-19 15:37:15 UTC (rev 1118)
@@ -100,6 +100,7 @@
         };
 
     private:
+		size_t      _ref_cnt;       ///< Ref count for auto cleanup
         pthread_mutex_t _mutex;
         static u_int64_t _cnt;
         u_int64_t   _icnt;
@@ -118,6 +119,9 @@
         data_tok();
         ~data_tok();
 
+        inline size_t refcnt(void) { return _ref_cnt;}
+	    inline void ref(void) { _ref_cnt++; }
+        inline void unref(void) { _ref_cnt--; }
         inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
         inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
 
@@ -163,6 +167,10 @@
 
         void reset();
     };
+	
+	void intrusive_ptr_add_ref(data_tok* r);
+    void intrusive_ptr_release(data_tok* r);
+	
 
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-19 15:37:15 UTC (rev 1118)
@@ -604,7 +604,7 @@
 			}
 		}
         this_dtok_list.pop_front();
-		delete dtokp;
+		intrusive_ptr_release(dtokp);
     }
 }
 
@@ -628,7 +628,7 @@
         	}
 		}
         this_dtok_list.pop_front();
-	delete dtokp;
+	    intrusive_ptr_release( dtokp);
     }
     
 }




More information about the rhmessaging-commits mailing list