[rhmessaging-commits] rhmessaging commits: r3124 - in store/trunk/cpp: tests and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Feb 19 14:02:08 EST 2009


Author: kpvdr
Date: 2009-02-19 14:02:07 -0500 (Thu, 19 Feb 2009)
New Revision: 3124

Added:
   store/trunk/cpp/lib/TxnCtxt.cpp
Modified:
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ486418 - "qpidd+store The extra xids encountered after qpidd recovery from journal". Some minor format improvements to the python journal analysis tool.

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/Makefile.am	2009-02-19 19:02:07 UTC (rev 3124)
@@ -49,6 +49,7 @@
   MessageStoreImpl.cpp          \
   PreparedTransaction.cpp       \
   StringDbt.cpp                 \
+  TxnCtxt.cpp                   \
   BindingDbt.h                  \
   BufferValue.h                 \
   Cursor.h                      \

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-02-19 19:02:07 UTC (rev 3124)
@@ -29,6 +29,7 @@
 #include "jrnl/txn_map.hpp"
 #include "qpid/log/Statement.h"
 #include "qmf/com/redhat/rhm/store/Package.h"
+#include "StoreException.h"
 
 #define MAX_AIO_SLEEPS 1000 // ~1 second
 #define AIO_SLEEP_TIME 1000 // 1 milisecond

Added: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/TxnCtxt.cpp	2009-02-19 19:02:07 UTC (rev 3124)
@@ -0,0 +1,162 @@
+#include "TxnCtxt.h"
+
+#include <sstream>
+#include <unistd.h> // ::usleep()
+
+#include "jrnl/jexception.hpp"
+#include "StoreException.h"
+
+namespace mrg {
+namespace msgstore {
+
+void TxnCtxt::completeTxn(bool commit) {
+    sync();
+    for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+        commitTxn(static_cast<JournalImpl*>(*i), commit);
+    }
+    impactedQueues.clear();
+    if (preparedXidStorePtr)
+        commitTxn(preparedXidStorePtr, commit);
+}
+
+void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
+    if (jc && loggedtx) { /* if using journal */
+        boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+        dtokp->addRef();
+        dtokp->set_external_rid(true);
+        dtokp->set_rid(loggedtx->next());
+        try {
+            if (commit) {
+                jc->txn_commit(dtokp.get(), getXid());
+                sync();
+            } else {
+                jc->txn_abort(dtokp.get(), getXid());
+            }
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+        }
+    }
+}
+
+//static
+uuid_t TxnCtxt::uuid;
+
+// static
+IdSequence TxnCtxt::uuidSeq;
+
+// static
+bool TxnCtxt::staticInit = TxnCtxt::setUuid();
+
+// static
+bool TxnCtxt::setUuid() {
+    ::uuid_generate(uuid);
+    return true;
+}
+
+TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
+    if (loggedtx) {   
+//         // Human-readable tid: 53 bytes
+//         // uuit_t is a char[16]
+//         tid.reserve(53);
+//         u_int64_t* u1 = (u_int64_t*)uuid;
+//         u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t));
+//         std::stringstream s;
+//         s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2;
+//         tid.assign(s.str());
+
+        // Binary tid: 24 bytes
+        tid.reserve(24);
+        u_int64_t c = uuidSeq.next();
+        tid.append((char*)&c, sizeof(c));
+        tid.append((char*)&uuid, sizeof(uuid));
+    }
+}
+
+TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
+TxnCtxt::~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 5000 // ~1 second
+#define SYNC_SLEEP_TIME 200 // 0.2 ms
+
+void TxnCtxt::sync() {
+    bool allWritten = false;
+    bool firstloop = true;
+    long sleep_cnt = 0L;
+    while (loggedtx && !allWritten) {
+        if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
+        if (!firstloop) {
+            ::usleep(SYNC_SLEEP_TIME);
+            sleep_cnt++;
+        } // move this into the get events call aiolib..
+        allWritten = true;
+        for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+            sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+        }
+        if (preparedXidStorePtr)
+            sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
+        firstloop = false;
+    }
+}
+
+void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+    try {
+        if (jc && !(jc->is_txn_synced(getXid()))) {
+            if (firstloop)
+                jc->flush();
+            allWritten = false;
+            jc->get_wr_events();
+        }
+    } catch (const journal::jexception& e) {
+        THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+    }
+}
+
+void TxnCtxt::begin(DbEnv& env, bool sync) {
+    env.txn_begin(0, &txn, 0);
+    if (sync)
+        globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+}
+
+void TxnCtxt::commit() {
+    if (txn) {
+        txn->commit(0);
+        txn = 0;
+        globalHolder.reset();
+    }
+}
+
+void TxnCtxt::abort(){
+    if (txn) {
+        txn->abort();
+        txn = 0;
+        globalHolder.reset();
+    }
+}
+
+DbTxn* TxnCtxt::get() { return txn; }
+
+bool TxnCtxt::isTPC() { return false; }
+
+const std::string& TxnCtxt::getXid() { return tid; }
+
+void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+
+void TxnCtxt::complete(bool commit) { completeTxn(commit); }
+
+bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); }
+
+DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); }
+
+void TxnCtxt::incrDtokRef() { dtokp->addRef(); }
+
+void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) {
+    dtokp->set_rid(rid);
+    dtokp->set_wstate(DataTokenImpl::ENQ);
+    dtokp->set_xid(xid);
+    dtokp->set_external_rid(true);
+}
+
+TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
+
+}}

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/TxnCtxt.h	2009-02-19 19:02:07 UTC (rev 3124)
@@ -24,23 +24,19 @@
 #ifndef _TxnCtxt_
 #define _TxnCtxt_
 
-#include <boost/format.hpp>
 #include <boost/intrusive_ptr.hpp>
 #include <db-inc.h>
 #include <memory>
 #include <set>
-#include <sstream>
 #include <string>
-#include <unistd.h> // ::usleep()
-
+ 
 #include "DataTokenImpl.h"
 #include "IdSequence.h"
 #include "JournalImpl.h"
-#include "jrnl/jexception.hpp"
 #include "qpid/broker/PersistableQueue.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/sys/Mutex.h"
-#include "StoreException.h"
+#include "qpid/sys/uuid.h"
 
 namespace mrg {
 namespace msgstore {
@@ -50,6 +46,11 @@
   protected:
     static qpid::sys::Mutex globalSerialiser;
 
+    static uuid_t uuid;
+    static IdSequence uuidSeq;
+    static bool staticInit;
+    static bool setUuid();
+
     typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
     typedef ipqdef::iterator ipqItr;
     typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
@@ -67,133 +68,45 @@
     std::string tid;
     DbTxn* txn;
 
-    virtual void completeTxn(bool commit) {
-        sync();
-        for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
-            commitTxn(static_cast<JournalImpl*>(*i), commit);
-        }
-        impactedQueues.clear();
-        if (preparedXidStorePtr)
-            commitTxn(preparedXidStorePtr, commit);
-    }
+    virtual void completeTxn(bool commit);
+    void commitTxn(JournalImpl* jc, bool commit);
 
-    void commitTxn(JournalImpl* jc, bool commit) {
-        if (jc && loggedtx) { /* if using journal */
-            boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
-            dtokp->addRef();
-            dtokp->set_external_rid(true);
-            dtokp->set_rid(loggedtx->next());
-            try {
-                if (commit) {
-                    jc->txn_commit(dtokp.get(), getXid());
-                    sync();
-                } else {
-                    jc->txn_abort(dtokp.get(), getXid());
-                }
-            } catch (const journal::jexception& e) {
-                THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
-            }
-        }
-    }
-
   public:
+    TxnCtxt(IdSequence* _loggedtx=NULL);
+    TxnCtxt(std::string _tid, IdSequence* _loggedtx);
+    virtual ~TxnCtxt();
 
-    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
-        if (loggedtx) {
-            std::stringstream s;
-            s << "rhm-tid" << this;
-            tid.assign(s.str());
-        }
-    }
-
-    TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
-
     /**
      * Call to make sure all the data for this txn is written to safe store
      *
      *@return if the data sucessfully synced.
      */
+    void sync();
+    void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
+    void begin(DbEnv& env, bool sync = false);
+    void commit();
+    void abort();
+    DbTxn* get();
+    virtual bool isTPC();
+    virtual const std::string& getXid();
 
-    virtual ~TxnCtxt() { if(txn) abort(); }
-
-#define MAX_SYNC_SLEEPS 5000 // ~1 second
-#define SYNC_SLEEP_TIME 200 // 0.2 ms
-
-    void sync() {
-        bool allWritten = false;
-        bool firstloop = true;
-        long sleep_cnt = 0L;
-        while (loggedtx && !allWritten) {
-            if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
-            if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
-            allWritten = true;
-            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
-                sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
-            }
-            if (preparedXidStorePtr)
-                sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
-            firstloop = false;
-        }
-    }
-
-    void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
-        try {
-            if (jc && !(jc->is_txn_synced(getXid()))) {
-                if (firstloop) jc->flush();
-                allWritten = false;
-                jc->get_wr_events();
-            }
-        } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
-        }
-    }
-
-    void begin(DbEnv& env, bool sync = false) {
-        env.txn_begin(0, &txn, 0);
-        if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
-    }
-
-    void commit() {
-        if (txn) {
-            txn->commit(0);
-            txn = 0;
-            globalHolder.reset();
-        }
-    }
-
-    void abort(){
-        if (txn) {
-            txn->abort();
-            txn = 0;
-            globalHolder.reset();
-        }
-    }
-
-    DbTxn* get() { return txn; }
-    virtual bool isTPC() { return false; }
-    virtual const std::string& getXid() { return tid; }
-
-    void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+    void addXidRecord(qpid::broker::ExternalQueueStore* queue);
     inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
-    void complete(bool commit) { completeTxn(commit); }
-    bool impactedQueuesEmpty() { return impactedQueues.empty(); }
-    DataTokenImpl* getDtok() { return dtokp.get(); }
-    void incrDtokRef() { dtokp->addRef(); }
-    void recoverDtok(const u_int64_t rid, const std::string xid) {
-        dtokp->set_rid(rid);
-        dtokp->set_wstate(DataTokenImpl::ENQ);
-        dtokp->set_xid(xid);
-        dtokp->set_external_rid(true);
-    }
+    void complete(bool commit);
+    bool impactedQueuesEmpty();
+    DataTokenImpl* getDtok();
+    void incrDtokRef();
+    void recoverDtok(const u_int64_t rid, const std::string xid);
 };
 
+
 class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
 {
   protected:
     const std::string xid;
 
   public:
-    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
+    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx);
     inline virtual bool isTPC() { return true; }
     inline virtual const std::string& getXid() { return xid; }
 };

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2009-02-19 19:02:07 UTC (rev 3124)
@@ -26,6 +26,7 @@
 #include "MessageStoreImpl.h"
 #include <iostream>
 #include "MessageUtils.h"
+#include "StoreException.h"
 #include <qpid/broker/Queue.h>
 #include <qpid/broker/RecoveryManagerImpl.h>
 #include <qpid/framing/AMQHeaderBody.h>

Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2009-02-19 19:02:07 UTC (rev 3124)
@@ -48,6 +48,10 @@
 transient_mask = 0x10
 extern_mask    = 0x20
 
+printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
+
+
+
 #== global functions ===========================================================
 
 def load(f, klass):
@@ -78,7 +82,7 @@
     return f.tell() >= jfsize
 
 def isprintable(s):
-    return s.strip(string.printable) == ''
+    return s.strip(printchars) == ''
 
 def print_xid(xidsize, xid):
     if xid == None:
@@ -107,6 +111,8 @@
 def hex_split_str(s, split_size = 50):
     if len(s) <= split_size:
         return hex_str(s, 0, len(s))
+    if len(s) > split_size + 25:
+        return hex_str(s, 0, 10) + ' ... ' + hex_str(s, 55, 65) + ' ... ' + hex_str(s, len(s)-10, len(s))
     return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
 
 def hex_str(s, b, e):
@@ -118,11 +124,10 @@
             o += '\\%02x' % ord(s[i])
     return o
 
-def split_str(s):
-    if len(s) > 25:
-        return s[:12] + ' ... ' + s[-10:]
-    else:
+def split_str(s, split_size = 50):
+    if len(s) < split_size:
         return s
+    return s[:25] + ' ... ' + s[-25:]
 
 def inv_str(s):
     si = ''
@@ -587,7 +592,7 @@
                             if len(mismatched_rids) > 0:
                                 warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
                         else:
-                            warn = ' (WARNING: xid %s not found in transaction map)' % hdr.xid
+                            warn = ' (WARNING: %s not found in transaction map)' % print_xid(len(hdr.xid), hdr.xid)
                 if not self.qflag: print ' > %s%s' % (hdr, warn)
                 if not stop:
                     stop = (self.last_file and hdr.check()) or hdr.empty() or self.fhdr.empty()
@@ -808,7 +813,7 @@
                 print
                 print 'Remaining transactions: '
                 for t in self.tmap:
-                    print "xid=%s:" % t
+                    print_xid(len(t), t)
                     for r in self.tmap[t]:
                         print "  fid=%d %s" % (r[0], r[1])
                     print " Total: %d records for xid %s" % (len(self.tmap[t]), t)




More information about the rhmessaging-commits mailing list