Author: kpvdr
Date: 2009-02-19 14:02:07 -0500 (Thu, 19 Feb 2009)
New Revision: 3124
Added:
store/trunk/cpp/lib/TxnCtxt.cpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ486418 - "qpidd+store The extra xids encountered after qpidd recovery from
journal". Some minor format improvements to the python journal analysis tool.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/Makefile.am 2009-02-19 19:02:07 UTC (rev 3124)
@@ -49,6 +49,7 @@
MessageStoreImpl.cpp \
PreparedTransaction.cpp \
StringDbt.cpp \
+ TxnCtxt.cpp \
BindingDbt.h \
BufferValue.h \
Cursor.h \
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -29,6 +29,7 @@
#include "jrnl/txn_map.hpp"
#include "qpid/log/Statement.h"
#include "qmf/com/redhat/rhm/store/Package.h"
+#include "StoreException.h"
#define MAX_AIO_SLEEPS 1000 // ~1 second
#define AIO_SLEEP_TIME 1000 // 1 milisecond
Added: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp (rev 0)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -0,0 +1,162 @@
+#include "TxnCtxt.h"
+
+#include <sstream>
+#include <unistd.h> // ::usleep()
+
+#include "jrnl/jexception.hpp"
+#include "StoreException.h"
+
+namespace mrg {
+namespace msgstore {
+
+void TxnCtxt::completeTxn(bool commit) {
+ sync();
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ commitTxn(static_cast<JournalImpl*>(*i), commit);
+ }
+ impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
+}
+
+void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(loggedtx->next());
+ try {
+ if (commit) {
+ jc->txn_commit(dtokp.get(), getXid());
+ sync();
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ }
+ }
+}
+
+//static
+uuid_t TxnCtxt::uuid;
+
+// static
+IdSequence TxnCtxt::uuidSeq;
+
+// static
+bool TxnCtxt::staticInit = TxnCtxt::setUuid();
+
+// static
+bool TxnCtxt::setUuid() {
+ ::uuid_generate(uuid);
+ return true;
+}
+
+TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl),
preparedXidStorePtr(0), txn(0) {
+ if (loggedtx) {
+// // Human-readable tid: 53 bytes
+// // uuit_t is a char[16]
+// tid.reserve(53);
+// u_int64_t* u1 = (u_int64_t*)uuid;
+// u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t));
+// std::stringstream s;
+// s << "tid:" << std::hex <<
std::setfill('0') << std::setw(16) << uuidSeq.next() <<
":" << std::setw(16) << *u1 << std::setw(16) << *u2;
+// tid.assign(s.str());
+
+ // Binary tid: 24 bytes
+ tid.reserve(24);
+ u_int64_t c = uuidSeq.next();
+ tid.append((char*)&c, sizeof(c));
+ tid.append((char*)&uuid, sizeof(uuid));
+ }
+}
+
+TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx),
dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
+TxnCtxt::~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 5000 // ~1 second
+#define SYNC_SLEEP_TIME 200 // 0.2 ms
+
+void TxnCtxt::sync() {
+ bool allWritten = false;
+ bool firstloop = true;
+ long sleep_cnt = 0L;
+ while (loggedtx && !allWritten) {
+ if (sleep_cnt > MAX_SYNC_SLEEPS)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for
TxnCtxt::sync()"));
+ if (!firstloop) {
+ ::usleep(SYNC_SLEEP_TIME);
+ sleep_cnt++;
+ } // move this into the get events call aiolib..
+ allWritten = true;
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+ }
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
+ firstloop = false;
+ }
+}
+
+void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop)
+ jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+}
+
+void TxnCtxt::begin(DbEnv& env, bool sync) {
+ env.txn_begin(0, &txn, 0);
+ if (sync)
+ globalHolder = AutoScopedLock(new
qpid::sys::Mutex::ScopedLock(globalSerialiser));
+}
+
+void TxnCtxt::commit() {
+ if (txn) {
+ txn->commit(0);
+ txn = 0;
+ globalHolder.reset();
+ }
+}
+
+void TxnCtxt::abort(){
+ if (txn) {
+ txn->abort();
+ txn = 0;
+ globalHolder.reset();
+ }
+}
+
+DbTxn* TxnCtxt::get() { return txn; }
+
+bool TxnCtxt::isTPC() { return false; }
+
+const std::string& TxnCtxt::getXid() { return tid; }
+
+void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) {
impactedQueues.insert(queue); }
+
+void TxnCtxt::complete(bool commit) { completeTxn(commit); }
+
+bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); }
+
+DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); }
+
+void TxnCtxt::incrDtokRef() { dtokp->addRef(); }
+
+void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) {
+ dtokp->set_rid(rid);
+ dtokp->set_wstate(DataTokenImpl::ENQ);
+ dtokp->set_xid(xid);
+ dtokp->set_external_rid(true);
+}
+
+TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) :
TxnCtxt(_loggedtx), xid(_xid) {}
+
+}}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/TxnCtxt.h 2009-02-19 19:02:07 UTC (rev 3124)
@@ -24,23 +24,19 @@
#ifndef _TxnCtxt_
#define _TxnCtxt_
-#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
#include <db-inc.h>
#include <memory>
#include <set>
-#include <sstream>
#include <string>
-#include <unistd.h> // ::usleep()
-
+
#include "DataTokenImpl.h"
#include "IdSequence.h"
#include "JournalImpl.h"
-#include "jrnl/jexception.hpp"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"
-#include "StoreException.h"
+#include "qpid/sys/uuid.h"
namespace mrg {
namespace msgstore {
@@ -50,6 +46,11 @@
protected:
static qpid::sys::Mutex globalSerialiser;
+ static uuid_t uuid;
+ static IdSequence uuidSeq;
+ static bool staticInit;
+ static bool setUuid();
+
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
typedef ipqdef::iterator ipqItr;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
@@ -67,133 +68,45 @@
std::string tid;
DbTxn* txn;
- virtual void completeTxn(bool commit) {
- sync();
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- commitTxn(static_cast<JournalImpl*>(*i), commit);
- }
- impactedQueues.clear();
- if (preparedXidStorePtr)
- commitTxn(preparedXidStorePtr, commit);
- }
+ virtual void completeTxn(bool commit);
+ void commitTxn(JournalImpl* jc, bool commit);
- void commitTxn(JournalImpl* jc, bool commit) {
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->addRef();
- dtokp->set_external_rid(true);
- dtokp->set_rid(loggedtx->next());
- try {
- if (commit) {
- jc->txn_commit(dtokp.get(), getXid());
- sync();
- } else {
- jc->txn_abort(dtokp.get(), getXid());
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
- }
- }
- }
-
public:
+ TxnCtxt(IdSequence* _loggedtx=NULL);
+ TxnCtxt(std::string _tid, IdSequence* _loggedtx);
+ virtual ~TxnCtxt();
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl),
preparedXidStorePtr(0), txn(0) {
- if (loggedtx) {
- std::stringstream s;
- s << "rhm-tid" << this;
- tid.assign(s.str());
- }
- }
-
- TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new
DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
-
/**
* Call to make sure all the data for this txn is written to safe store
*
*@return if the data sucessfully synced.
*/
+ void sync();
+ void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
+ void begin(DbEnv& env, bool sync = false);
+ void commit();
+ void abort();
+ DbTxn* get();
+ virtual bool isTPC();
+ virtual const std::string& getXid();
- virtual ~TxnCtxt() { if(txn) abort(); }
-
-#define MAX_SYNC_SLEEPS 5000 // ~1 second
-#define SYNC_SLEEP_TIME 200 // 0.2 ms
-
- void sync() {
- bool allWritten = false;
- bool firstloop = true;
- long sleep_cnt = 0L;
- while (loggedtx && !allWritten) {
- if (sleep_cnt > MAX_SYNC_SLEEPS)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for
TxnCtxt::sync()"));
- if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into
the get events call aiolib..
- allWritten = true;
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
- }
- if (preparedXidStorePtr)
- sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
- firstloop = false;
- }
- }
-
- void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop) jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
- }
-
- void begin(DbEnv& env, bool sync = false) {
- env.txn_begin(0, &txn, 0);
- if (sync) globalHolder = AutoScopedLock(new
qpid::sys::Mutex::ScopedLock(globalSerialiser));
- }
-
- void commit() {
- if (txn) {
- txn->commit(0);
- txn = 0;
- globalHolder.reset();
- }
- }
-
- void abort(){
- if (txn) {
- txn->abort();
- txn = 0;
- globalHolder.reset();
- }
- }
-
- DbTxn* get() { return txn; }
- virtual bool isTPC() { return false; }
- virtual const std::string& getXid() { return tid; }
-
- void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
impactedQueues.insert(queue); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue);
inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr =
_preparedXidStorePtr; }
- void complete(bool commit) { completeTxn(commit); }
- bool impactedQueuesEmpty() { return impactedQueues.empty(); }
- DataTokenImpl* getDtok() { return dtokp.get(); }
- void incrDtokRef() { dtokp->addRef(); }
- void recoverDtok(const u_int64_t rid, const std::string xid) {
- dtokp->set_rid(rid);
- dtokp->set_wstate(DataTokenImpl::ENQ);
- dtokp->set_xid(xid);
- dtokp->set_external_rid(true);
- }
+ void complete(bool commit);
+ bool impactedQueuesEmpty();
+ DataTokenImpl* getDtok();
+ void incrDtokRef();
+ void recoverDtok(const u_int64_t rid, const std::string xid);
};
+
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
{
protected:
const std::string xid;
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid) {}
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx);
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -26,6 +26,7 @@
#include "MessageStoreImpl.h"
#include <iostream>
#include "MessageUtils.h"
+#include "StoreException.h"
#include <qpid/broker/Queue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-19 19:02:07 UTC (rev 3124)
@@ -48,6 +48,10 @@
transient_mask = 0x10
extern_mask = 0x20
+printchars =
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~
'
+
+
+
#== global functions ===========================================================
def load(f, klass):
@@ -78,7 +82,7 @@
return f.tell() >= jfsize
def isprintable(s):
- return s.strip(string.printable) == ''
+ return s.strip(printchars) == ''
def print_xid(xidsize, xid):
if xid == None:
@@ -107,6 +111,8 @@
def hex_split_str(s, split_size = 50):
if len(s) <= split_size:
return hex_str(s, 0, len(s))
+ if len(s) > split_size + 25:
+ return hex_str(s, 0, 10) + ' ... ' + hex_str(s, 55, 65) + ' ... '
+ hex_str(s, len(s)-10, len(s))
return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
def hex_str(s, b, e):
@@ -118,11 +124,10 @@
o += '\\%02x' % ord(s[i])
return o
-def split_str(s):
- if len(s) > 25:
- return s[:12] + ' ... ' + s[-10:]
- else:
+def split_str(s, split_size = 50):
+ if len(s) < split_size:
return s
+ return s[:25] + ' ... ' + s[-25:]
def inv_str(s):
si = ''
@@ -587,7 +592,7 @@
if len(mismatched_rids) > 0:
warn = ' (WARNING: transactional dequeues not found
in enqueue map; rids=%s)' % mismatched_rids
else:
- warn = ' (WARNING: xid %s not found in transaction
map)' % hdr.xid
+ warn = ' (WARNING: %s not found in transaction map)'
% print_xid(len(hdr.xid), hdr.xid)
if not self.qflag: print ' > %s%s' % (hdr, warn)
if not stop:
stop = (self.last_file and hdr.check()) or hdr.empty() or
self.fhdr.empty()
@@ -808,7 +813,7 @@
print
print 'Remaining transactions: '
for t in self.tmap:
- print "xid=%s:" % t
+ print_xid(len(t), t)
for r in self.tmap[t]:
print " fid=%d %s" % (r[0], r[1])
print " Total: %d records for xid %s" % (len(self.tmap[t]),
t)