[rhmessaging-commits] rhmessaging commits: r1118 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Oct 19 11:37:16 EDT 2007
Author: cctrieloff
Date: 2007-10-19 11:37:15 -0400 (Fri, 19 Oct 2007)
New Revision: 1118
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
async mem clean up with intrusive_ptr
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-19 15:37:15 UTC (rev 1118)
@@ -30,6 +30,7 @@
#include "BindingDbt.h"
#include "IdPairDbt.h"
#include "StringDbt.h"
+#include <boost/intrusive_ptr.hpp>
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -829,49 +830,39 @@
Buffer buffer(buff,size);
buffer.putLong(headerSize);
message.encode(buffer);
- //buffer.flip();
-
- DataTokenImpl* dtokp = NULL;
try {
if ( queue && usingJrnl()){
- dtokp = new DataTokenImpl;
- // deleted this in the callback...
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->ref();
dtokp->setSourceMessage (&message);
dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal header (record-id)
-// unsigned aio_sleep_cnt = 0;
bool written = false;
while (!written)
{
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
rhm::journal::iores eres;
if (txn->getXid().empty()){
- eres = jc->enqueue_data_record(buff, size, size, dtokp, false);
+ eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp, txn->getXid(), false);
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
- if (dtokp->wstate() >= DataTokenImpl::ENQ_SUBM)
+ if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
-/* if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
- delete dtokp;
- THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " + queue->getName());
- }*/
usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
jc->get_wr_events();
break;
case rhm::journal::RHM_IORES_FULL:
- delete dtokp;
THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
break;
default:
- delete dtokp;
assert( "Store Error: Unexpected msg state");
}
}
@@ -886,7 +877,6 @@
}catch ( journal::jexception& e) {
std::string str;
// std::cout << "-------------" << e << std::endl;
- if (dtokp) delete dtokp;
THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
}catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
@@ -948,7 +938,8 @@
{
// unsigned aio_sleep_cnt = 0;
bool written = false;
- DataTokenImpl* ddtokp = new DataTokenImpl;
+ boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
+ ddtokp->ref();
ddtokp->setSourceMessage (&msg);
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg.getPersistenceId());
@@ -965,13 +956,12 @@
rhm::journal::iores dres;
try {
if (tid.empty()){
- dres = jc->dequeue_data_record(ddtokp);
+ dres = jc->dequeue_data_record(ddtokp.get());
} else {
- dres = jc->dequeue_txn_data_record(ddtokp, tid);
+ dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
} catch (rhm::journal::jexception& e) {
std::string str;
- //delete ddtokp;
THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
}
switch (dres)
@@ -981,15 +971,10 @@
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
-/* if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
- delete ddtokp;
- THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for: " + queue.getName());
- } */
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
jc->get_wr_events();
break;
default:
- delete ddtokp;
assert( "Store Error: Unexpected msg state");
}
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-19 15:37:15 UTC (rev 1118)
@@ -32,6 +32,7 @@
#include "JournalImpl.h"
#include "DataTokenImpl.h"
#include <boost/format.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace rhm{
namespace bdbstore{
@@ -64,19 +65,19 @@
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
- DataTokenImpl* dtokp = new DataTokenImpl;
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->ref();
dtokp->set_rid(loggedtx->next());
try{
if (commit)
- jc->txn_commit(dtokp, getXid());
+ jc->txn_commit(dtokp.get(), getXid());
else
{
- jc->txn_abort(dtokp, getXid());
+ jc->txn_abort(dtokp.get(), getXid());
}
} catch (rhm::journal::jexception& e) {
std::string str;
//std::cout << "Error commit" << e << std::endl;
- delete dtokp;
THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-10-19 15:37:15 UTC (rev 1118)
@@ -33,6 +33,7 @@
#include <jrnl/data_tok.hpp>
#include <sstream>
+#include <iostream>
#include <jrnl/jerrno.hpp>
namespace rhm
@@ -40,11 +41,25 @@
namespace journal
{
+void intrusive_ptr_add_ref(data_tok* tok)
+{
+ tok->ref();
+}
+
+void intrusive_ptr_release(data_tok* tok)
+{
+ tok->unref();
+ if (tok->refcnt() == 0)
+ delete tok;
+}
+
+
// Static members
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
+ _ref_cnt(0),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-10-19 15:37:15 UTC (rev 1118)
@@ -100,6 +100,7 @@
};
private:
+ size_t _ref_cnt; ///< Ref count for auto cleanup
pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
@@ -118,6 +119,9 @@
data_tok();
~data_tok();
+ inline size_t refcnt(void) { return _ref_cnt;}
+ inline void ref(void) { _ref_cnt++; }
+ inline void unref(void) { _ref_cnt--; }
inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
@@ -163,6 +167,10 @@
void reset();
};
+
+ void intrusive_ptr_add_ref(data_tok* r);
+ void intrusive_ptr_release(data_tok* r);
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-19 15:16:19 UTC (rev 1117)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-19 15:37:15 UTC (rev 1118)
@@ -604,7 +604,7 @@
}
}
this_dtok_list.pop_front();
- delete dtokp;
+ intrusive_ptr_release(dtokp);
}
}
@@ -628,7 +628,7 @@
}
}
this_dtok_list.pop_front();
- delete dtokp;
+ intrusive_ptr_release( dtokp);
}
}
More information about the rhmessaging-commits
mailing list