rhmessaging commits: r2181 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-07-08 12:16:08 -0400 (Tue, 08 Jul 2008)
New Revision: 2181
Modified:
mgmt/trunk/cumin/python/cumin/test.py
Log:
Use a temp dir for the default qpidd process output
Modified: mgmt/trunk/cumin/python/cumin/test.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/test.py 2008-07-08 15:52:17 UTC (rev 2180)
+++ mgmt/trunk/cumin/python/cumin/test.py 2008-07-08 16:16:08 UTC (rev 2181)
@@ -5,6 +5,7 @@
from threading import Thread
from popen2 import Popen4
from shutil import copyfileobj
+from tempfile import mkdtemp
import qpid, quirk, wooly
from cumin import Cumin
@@ -18,22 +19,23 @@
self.path = path
self.port = port
- self.key = "cumin-test-qpidd-%i" % port
-
+ self.temp_dir = mkdtemp()
+ self.data_dir = os.path.join(self.temp_dir, "data")
+
self.command = (path,
"--port", str(port),
"--auth", "no",
"--trace",
- "--data-dir", self.key)
+ "--data-dir", self.data_dir)
- self.log = "%s.log" % self.key
+ self.output_path = os.path.join(self.temp_dir, "output")
self.setDaemon(True)
def run(self):
- os.makedirs(self.key)
+ print "Saving qpidd output to %s" % self.temp_dir
- out = open(self.log, "w")
+ out = open(self.output_path, "w")
try:
pop = Popen4(self.command)
@@ -47,7 +49,7 @@
kpid, stat = os.waitpid(pop.pid, os.WNOHANG)
if kpid == 0:
- print "Test qpidd not killed"
+ print "Test qpidd wouldn't die"
class TestEnvironment(object):
def __init__(self, app, broker_host, broker_port, spec_path):
17 years, 9 months
rhmessaging commits: r2180 - in mgmt/trunk/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-07-08 11:52:17 -0400 (Tue, 08 Jul 2008)
New Revision: 2180
Modified:
mgmt/trunk/cumin/bin/cumin-test
mgmt/trunk/cumin/python/cumin/test.py
Log:
Add more process handling for the default test broker; pass in a data dir; use the default path, with sbin dirs added, to look up the qpidd executable
Modified: mgmt/trunk/cumin/bin/cumin-test
===================================================================
--- mgmt/trunk/cumin/bin/cumin-test 2008-07-08 00:43:06 UTC (rev 2179)
+++ mgmt/trunk/cumin/bin/cumin-test 2008-07-08 15:52:17 UTC (rev 2180)
@@ -21,6 +21,10 @@
sys.exit(1)
+ usr_sbin = "/usr/sbin"
+ if usr_sbin not in sys.path:
+ sys.path.append(usr_sbin)
+
app.init()
if config.broker:
@@ -30,7 +34,7 @@
# supports it
host, port = "localhost", randint(16384, 32767)
- broker = TestBroker("/usr/sbin/qpidd", port)
+ broker = TestBroker("qpidd", port)
broker.start()
env = TestEnvironment(app, host, port, config.spec)
Modified: mgmt/trunk/cumin/python/cumin/test.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/test.py 2008-07-08 00:43:06 UTC (rev 2179)
+++ mgmt/trunk/cumin/python/cumin/test.py 2008-07-08 15:52:17 UTC (rev 2180)
@@ -1,4 +1,4 @@
-import sys, os
+import sys, os, signal
from mint import *
from traceback import print_exc, extract_tb
from datetime import datetime
@@ -17,24 +17,38 @@
self.path = path
self.port = port
- self.command = (path, "--port", str(port),
+
+ self.key = "cumin-test-qpidd-%i" % port
+
+ self.command = (path,
+ "--port", str(port),
"--auth", "no",
- "-t",
- "--no-data-dir")
- self.output = "qpidd-test.log"
+ "--trace",
+ "--data-dir", self.key)
+
+ self.log = "%s.log" % self.key
+
self.setDaemon(True)
def run(self):
- out = open(self.output, "w")
+ os.makedirs(self.key)
+ out = open(self.log, "w")
+
try:
pop = Popen4(self.command)
copyfileobj(pop.fromchild, out)
exit_code = pop.wait()
print exit_code
finally:
- print "hmmm!"
+ if pop:
+ os.kill(pop.pid, signal.SIGKILL)
+ kpid, stat = os.waitpid(pop.pid, os.WNOHANG)
+
+ if kpid == 0:
+ print "Test qpidd not killed"
+
class TestEnvironment(object):
def __init__(self, app, broker_host, broker_port, spec_path):
self.app = app
@@ -53,7 +67,7 @@
def init(self):
self.broker_conn.open()
-
+
session = quirk.Session(self.broker_conn, "test")
session.open()
17 years, 9 months
rhmessaging commits: r2179 - in mgmt/trunk/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-07-07 20:43:06 -0400 (Mon, 07 Jul 2008)
New Revision: 2179
Modified:
mgmt/trunk/cumin/bin/cumin-test
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/test.py
mgmt/trunk/cumin/python/cumin/util.py
Log:
If no broker is given, try to spin up a broker on a random port
Modified: mgmt/trunk/cumin/bin/cumin-test
===================================================================
--- mgmt/trunk/cumin/bin/cumin-test 2008-07-07 14:17:52 UTC (rev 2178)
+++ mgmt/trunk/cumin/bin/cumin-test 2008-07-08 00:43:06 UTC (rev 2179)
@@ -1,7 +1,7 @@
#!/usr/bin/python
import sys, os
-from time import time
+from time import time, sleep
from cumin import *
from cumin.test import *
@@ -23,8 +23,16 @@
app.init()
- host, port = parse_broker_addr(config.broker)
+ if config.broker:
+ host, port = parse_broker_addr(config.broker)
+ else:
+ # XXX change this to 49152..65535 when the underlying datatype
+ # supports it
+ host, port = "localhost", randint(16384, 32767)
+ broker = TestBroker("/usr/sbin/qpidd", port)
+ broker.start()
+
env = TestEnvironment(app, host, port, config.spec)
env.init();
@@ -38,8 +46,8 @@
def main():
config = CuminConfig()
- summ = ("ADDR", "Register new test broker at ADDR")
- config.add_param("broker", str, "localhost:5672", summ)
+ summ = ("ADDR", "Use existing broker at ADDR")
+ config.add_param("broker", str, None, summ)
config.init()
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-07-07 14:17:52 UTC (rev 2178)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-07-08 00:43:06 UTC (rev 2179)
@@ -420,6 +420,7 @@
return object
except Exception, e:
+ log.exception("Action failed")
completion(e.message or "failed")
class Edit(CuminAction):
Modified: mgmt/trunk/cumin/python/cumin/test.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/test.py 2008-07-07 14:17:52 UTC (rev 2178)
+++ mgmt/trunk/cumin/python/cumin/test.py 2008-07-08 00:43:06 UTC (rev 2179)
@@ -2,12 +2,39 @@
from mint import *
from traceback import print_exc, extract_tb
from datetime import datetime
+from threading import Thread
+from popen2 import Popen4
+from shutil import copyfileobj
import qpid, quirk, wooly
from cumin import Cumin
from util import *
import time
+class TestBroker(Thread):
+ def __init__(self, path, port):
+ super(TestBroker, self).__init__()
+
+ self.path = path
+ self.port = port
+ self.command = (path, "--port", str(port),
+ "--auth", "no",
+ "-t",
+ "--no-data-dir")
+ self.output = "qpidd-test.log"
+ self.setDaemon(True)
+
+ def run(self):
+ out = open(self.output, "w")
+
+ try:
+ pop = Popen4(self.command)
+ copyfileobj(pop.fromchild, out)
+ exit_code = pop.wait()
+ print exit_code
+ finally:
+ print "hmmm!"
+
class TestEnvironment(object):
def __init__(self, app, broker_host, broker_port, spec_path):
self.app = app
@@ -65,7 +92,7 @@
class TestSession(object):
def __init__(self, env):
self.env = env
- self.id = datetime.now().strftime("test-%H-%M-%S")
+ self.id = short_id()
self.stack = list()
self.passed = list()
Modified: mgmt/trunk/cumin/python/cumin/util.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/util.py 2008-07-07 14:17:52 UTC (rev 2178)
+++ mgmt/trunk/cumin/python/cumin/util.py 2008-07-08 00:43:06 UTC (rev 2179)
@@ -1,7 +1,12 @@
+import sys
from time import mktime
from ConfigParser import SafeConfigParser
from logging import getLogger
+from random import randint
+def short_id():
+ return "%08x" % randint(0, sys.maxint)
+
def sorted_by(seq, attr="name"):
return sorted(seq, cmp, lambda x: getattr(x, attr))
17 years, 9 months
rhmessaging commits: r2178 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-07 10:17:52 -0400 (Mon, 07 Jul 2008)
New Revision: 2178
Modified:
store/trunk/cpp/tests/system_test.sh
Log:
Fixed script so that system test will fail if broker fails to start
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-07-05 20:14:45 UTC (rev 2177)
+++ store/trunk/cpp/tests/system_test.sh 2008-07-07 14:17:52 UTC (rev 2178)
@@ -47,13 +47,13 @@
BROKER_OPTS="--no-module-dir --load-module=$LIBBDBSTORE --data-dir=$TMPDIR --auth=no --wcache-page-size 16"
run_tests() {
for p in `seq 1 8`; do
- $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || return 1
+ $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start"; return 1; }
python "$abs_srcdir/persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1;
$abs_srcdir/stop_broker
done
}
echo 'Journal (AIO) persistence...'
-run_tests
+run_tests || fail=1
exit $fail
17 years, 9 months
rhmessaging commits: r2177 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-05 16:14:45 -0400 (Sat, 05 Jul 2008)
New Revision: 2177
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/persistence.py
Log:
Fixed problems with transaction recover: DataToken in TxnCtxt was not being restored; also highest rid found during restore was not taking account of the new preparedXid instance which shares the messageIdSequence. Other minor bugfixes and tidy-ups.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -29,6 +29,9 @@
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
using namespace rhm::bdbstore;
using namespace qpid::broker;
using boost::static_pointer_cast;
@@ -58,6 +61,7 @@
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
isInit(false),
envPath(envpath)
@@ -460,10 +464,18 @@
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
}
+
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+
+ // Restore data token state in TxnCtxt
+ xid_rid_map_citr citr = preparedMap.find(i->xid);
+ if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
+ tpcc->recoverDtok(citr->second, i->xid);
+ tpcc->addXidRecord(preparedXidStorePtr.get());
+
RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
@@ -492,7 +504,6 @@
IdDbt key;
Dbt value;
//read all queues
- u_int64_t highestRid = 0;
while (queues.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Queue instance
@@ -524,7 +535,11 @@
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
+
+ // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
+ // the messageIdSequence is used for both queue journals and the preparedXid journal.
messageIdSequence.reset(highestRid + 1);
+
queueIdSequence.reset(maxQueueId + 1);
}
@@ -605,9 +620,6 @@
}
-#define MAX_AIO_SLEEPS 1000 // ~1 second
-#define AIO_SLEEP_TIME 1000 // 1 milisecond
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -768,19 +780,6 @@
}
}
-void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- Dbt ignore;
- while (c.next(key, ignore)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- xids.insert(xid);
- }
-}
-
void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
{
Cursor c;
@@ -798,15 +797,50 @@
{
if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
{
- u_int64_t highest_rid;
+ u_int64_t thisHighestRid;
preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, highest_rid, 0);
-
- std::vector<std::string> xv;
- preparedXidStorePtr->get_open_txn_list(xv);
- for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
- xids.insert(*itr);
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ try {
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ DataTokenImpl dtokp;
+ bool done = false;
+ long aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
+ switch (res) {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ if (xidbuffSize > 0) {
+ xids.insert(std::string((const char*)xidbuff, xidbuffSize));
+ preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
+ ::free(xidbuff);
+ } else {
+ THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
+ }
+ aio_sleep_cnt = 0;
+ break;
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
+ }
preparedXidStorePtr->recover_complete(); // start journal.
}
@@ -1148,9 +1182,13 @@
try {
// Nothing to do if not prepared
chkInitPreparedXidStore();
- if (txn.getDtok().is_enqueued())
- preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
-
+ if (txn.getDtok()->is_enqueued()) {
+ txn.incrDtokRef();
+ DataTokenImpl* dtokp = txn.getDtok();
+ dtokp->set_dequeue_rid(dtokp->rid());
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+ }
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
@@ -1162,19 +1200,15 @@
{
checkInit();
// pass sequence number for c/a
- TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- return auto_ptr<TransactionContext>(txn);
+ return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
}
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
{
checkInit();
- IdSequence* jtx = NULL;
- jtx = &messageIdSequence;
-
+ IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
- TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- return auto_ptr<TPCTransactionContext>(txn);
+ return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
}
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
@@ -1185,7 +1219,11 @@
try {
chkInitPreparedXidStore();
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+ txn->incrDtokRef();
+ DataTokenImpl* dtokp = txn->getDtok();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-05 20:14:45 UTC (rev 2177)
@@ -25,29 +25,19 @@
#define _BdbMessageStore_
#include <string>
+
#include "db-inc.h"
-//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-//#include "IdSequence.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jcfg.hpp"
#include "PreparedTransaction.h"
-//#include "StoreException.h"
-#include "TxnCtxt.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/management/Manageable.h"
-//#include <qpid/sys/Monitor.h>
-//#include <qpid/sys/Time.h>
-//#include <map>
-//#include <set>
-//#include <iostream>
-//#include <boost/format.hpp>
-//#include <boost/intrusive_ptr.hpp>
-//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
-#include "jrnl/jcfg.hpp"
-#include "JournalImpl.h"
-#include "IdSequence.h"
+#include "TxnCtxt.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -57,7 +47,6 @@
namespace rhm {
namespace bdbstore {
-using std::string;
/**
* An implementation of the MessageStore interface based on Berkeley DB
@@ -70,6 +59,9 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
+
+ typedef std::map<std::string, u_int64_t> xid_rid_map;
+ typedef xid_rid_map::const_iterator xid_rid_map_citr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -98,6 +90,8 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
+ xid_rid_map preparedMap;
+ u_int64_t highestRid;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -121,7 +115,6 @@
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked, message_index& prepared);
void recoverXids(txn_list& txns);
- void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
@@ -152,21 +145,34 @@
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- string getJrnlBaseDir();
- string getBdbBaseDir();
- string getPxidBaseDir();
+ std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ std::string getJrnlDir(const char* queueName);
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
void chkInitPreparedXidStore();
+ // debug aid for printing XIDs that may contain non-printable chars
+ static std::string xid2str(const std::string xid) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (unsigned i=0; i<xid.size(); i++) {
+ if (isprint(xid[i]))
+ oss << xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)xid[i]);
+ }
+ return oss.str();
+ }
+
public:
struct Options : public qpid::Options {
Options(const std::string& name="Store Options");
- string clusterName;
- string storeDir;
+ std::string clusterName;
+ std::string storeDir;
bool storeAsync;
bool storeForce;
uint16_t numJrnlFiles;
@@ -222,7 +228,7 @@
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<string>& xids);
+ void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-05 20:14:45 UTC (rev 2177)
@@ -57,7 +57,7 @@
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
- DataTokenImpl dtok;
+ boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
/**
@@ -83,7 +83,6 @@
jc->txn_abort(dtokp.get(), getXid());
}
} catch (const journal::jexception& e) {
- //std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
@@ -93,7 +92,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -129,7 +128,6 @@
jc->get_wr_events();
}
} catch (const journal::jexception& e) {
- //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
}
}
@@ -165,7 +163,14 @@
void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
void complete(bool commit) { completeTXN(commit); }
- DataTokenImpl& getDtok() { return dtok; }
+ DataTokenImpl* getDtok() { return dtokp.get(); }
+ void incrDtokRef() { dtokp->addRef(); }
+ void recoverDtok(const u_int64_t rid, const std::string xid) {
+ dtokp->set_rid(rid);
+ dtokp->set_wstate(DataTokenImpl::ENQ);
+ dtokp->set_xid(xid);
+ dtokp->set_external_rid(true);
+ }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -30,6 +30,7 @@
#include "jrnl/data_tok.hpp"
+#include <iomanip>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
#include <sstream>
@@ -50,6 +51,7 @@
_dblks_written(0),
_dblks_read(0),
_pg_cnt(0),
+ _fid(0),
_rid(0),
_xid(),
_dequeue_rid(0),
@@ -163,9 +165,31 @@
_dblks_written = 0;
_dblks_read = 0;
_pg_cnt = 0;
+ _fid = 0;
_rid = 0;
_xid.clear();
}
+// debug aid
+std::string
+data_tok::status_str() const
+{
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok id=0x" << _cnt << "; ws=" << wstate_str() << "; rs=" << rstate_str();
+ oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+ for (unsigned i=0; i<_xid.size(); i++)
+ {
+ if (isprint(_xid[i]))
+ oss << _xid[i];
+ else
+ oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+ }
+ oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+ oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read;
+ oss << " pc=0x" << _pg_cnt;
+ return oss.str();
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -160,6 +160,9 @@
{ _xid.assign((const char*)xidp, xid_len); }
void reset();
+
+ // debug aid
+ std::string status_str() const;
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -97,7 +97,7 @@
return _fh_arr[pg_index];
}
-const std::string
+std::string
rrfc::status_str() const
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -136,7 +136,7 @@
{ return _curr_fh->wr_aio_outstanding_dblks() > 0; }
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -91,11 +91,8 @@
const txn_data_list
txn_map::get_tdata_list(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -127,11 +124,8 @@
bool
txn_map::in_map(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr= _map.find(xid);
if (itr == _map.end()) // not found in map
return false;
return true;
@@ -140,11 +134,8 @@
u_int32_t
txn_map::get_rid_count(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -176,7 +176,7 @@
return findex != _fh_index && in_use;
}
-const std::string
+std::string
wrfc::status_str() const
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -128,7 +128,7 @@
bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class wrfc
} // namespace journal
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -160,9 +160,6 @@
swap.check(commit);
restart();
swap.check(commit);
-
- // this test leaves xids in the store
- store->truncate();
}
void commit(Strategy& strategy)
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/tests/persistence.py 2008-07-05 20:14:45 UTC (rev 2177)
@@ -262,6 +262,19 @@
session = self.session
session.synchronous = False
+ # check xids from phase 6 are gone
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ if txd.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ self.assertEqual(0, len(xids))
+
#test deletion of queue after publish
#create queue
session.queue_declare(queue = "q", auto_delete=True, durable=True)
17 years, 10 months
rhmessaging commits: r2175 - store/trunk/cpp/lib/gen/qpid/management.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-07-03 15:50:57 -0400 (Thu, 03 Jul 2008)
New Revision: 2175
Modified:
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
Log:
Regenerated management components
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-03 18:47:51 UTC (rev 2174)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-03 19:50:57 UTC (rev 2175)
@@ -79,7 +79,7 @@
for (int idx = 0; idx < maxThreads; idx++)
if (perThreadStatsArray[idx] != 0)
delete perThreadStatsArray[idx];
- delete perThreadStatsArray;
+ delete[] perThreadStatsArray;
}
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-03 18:47:51 UTC (rev 2174)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-03 19:50:57 UTC (rev 2175)
@@ -36,7 +36,7 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
+ {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
Store::Store (ManagementAgent* _agent, Manageable* _core, Manageable* _parent) :
ManagementObject(_agent, _core)
@@ -75,7 +75,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (5); // Config Element Count
+ buf.putShort (4); // Config Element Count
buf.putShort (0); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -97,14 +97,6 @@
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "async");
- ft.setInt (TYPE, TYPE_BOOL);
- ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
- ft.setString (DESC, "Asynchronous IO");
- buf.put (ft);
-
- ft = FieldTable ();
ft.setString (NAME, "defaultInitialFileCount");
ft.setInt (TYPE, TYPE_U16);
ft.setInt (ACCESS, ACCESS_RO);
@@ -141,7 +133,6 @@
writeTimestamps (buf);
buf.putLongLong (brokerRef);
buf.putShortString (location);
- buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-03 18:47:51 UTC (rev 2174)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-03 19:50:57 UTC (rev 2175)
@@ -42,7 +42,6 @@
// Properties
uint64_t brokerRef;
std::string location;
- uint8_t async;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
@@ -90,11 +89,6 @@
location = val;
configChanged = true;
}
- inline void set_async (uint8_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- async = val;
- configChanged = true;
- }
inline void set_defaultInitialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
defaultInitialFileCount = val;
17 years, 10 months
rhmessaging commits: r2174 - store/trunk/specs.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-03 14:47:51 -0400 (Thu, 03 Jul 2008)
New Revision: 2174
Modified:
store/trunk/specs/management-schema.xml
Log:
Leftover from removal of sync option in store: removed async property from management of store class
Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml 2008-07-02 20:54:27 UTC (rev 2173)
+++ store/trunk/specs/management-schema.xml 2008-07-03 18:47:51 UTC (rev 2174)
@@ -6,7 +6,6 @@
<class name="Store">
<property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/>
<property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
- <property name="async" type="bool" access="RO" desc="Asynchronous IO"/>
<property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
<property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/>
</class>
17 years, 10 months
rhmessaging commits: r2173 - in store/trunk/cpp: lib/jrnl and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-02 16:54:27 -0400 (Wed, 02 Jul 2008)
New Revision: 2173
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/jrnl/_st_basic.cpp
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Moved prepared XID list from BDB to a journal instance in BdbMessageStore
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -22,20 +22,11 @@
*/
#include "BdbMessageStore.h"
-#include <qpid/broker/RecoveryManager.h>
-#include <qpid/broker/Message.h>
-#include <qpid/framing/Buffer.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Mutex.h>
-#include <algorithm>
-#include <iomanip>
-#include <sstream>
+
#include "BindingDbt.h"
+#include "BufferValue.h"
#include "IdPairDbt.h"
-#include "StringDbt.h"
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
using namespace rhm::bdbstore;
@@ -63,9 +54,6 @@
mappingDb(&env, 0),
bindingDb(&env, 0),
generalDb(&env, 0),
- enqueueXidDb(&env, 0),
- dequeueXidDb(&env, 0),
- prepareXidDb(&env, 0),
numJrnlFiles(defNumJrnlFiles),
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
@@ -125,12 +113,11 @@
if (dir.size()>0) storeDir = dir;
- string bdbdir = storeDir + "/rhm/dat/";
- journal::jdir::create_dir(bdbdir);
+ journal::jdir::create_dir(getBdbBaseDir());
+ journal::jdir::create_dir(getPxidBaseDir());
-
try {
- env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -150,10 +137,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
- open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
- open(prepareXidDb, txn.get(), "prepare_xid.db", false);
+ preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
+ } catch (const journal::jexception& e) {
+ txn.abort();
+ THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -167,6 +155,15 @@
return true;
}
+void BdbMessageStore::chkInitPreparedXidStore()
+{
+ if (!preparedXidStorePtr->is_init())
+ {
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+ }
+}
+
bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
@@ -253,11 +250,16 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
+ if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
} catch (const DbException& e) {
- QPID_LOG(error, "Error closing databases: " << e.what());
+ QPID_LOG(error, "Error closing BDB databases: " << e.what());
+ } catch (const journal::jexception& e) {
+ QPID_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- } catch (...) {}
+ QPID_LOG(error, "Error: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "Unknown error in BdbMessageStore::~BdbMessageStore()");
+ }
if (mgmtObject.get() != 0)
mgmtObject->resourceDestroy();
@@ -276,6 +278,7 @@
txn->commit(0);
try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ journal::jdir::delete_dir(getPxidBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -508,7 +511,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -601,6 +604,10 @@
generalIdSequence.reset(maxGeneralId + 1);
}
+
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -678,7 +685,7 @@
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
::usleep(AIO_SLEEP_TIME);
break;
case rhm::journal::RHM_IORES_EMPTY:
@@ -748,11 +755,11 @@
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> prepared;
- collectPreparedXids(prepared);
+ std::set<string> preparedXidSet;
+ collectPreparedXids(preparedXidSet);
- //when using the async journal, it will abort unprepaired xids and populate the locked maps
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ // Abort unprepaired xids and populate the locked maps
+ for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -789,7 +796,20 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- readXids(prepareXidDb, xids);
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
+ {
+ u_int64_t highest_rid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+ 0, highest_rid, 0);
+
+ std::vector<std::string> xv;
+ preparedXidStorePtr->get_open_txn_list(xv);
+ for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
+ xids.insert(*itr);
+
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
}
void BdbMessageStore::stage(const intrusive_ptr<PersistableMessage>& msg)
@@ -962,26 +982,18 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
- bool newId = false;
- if (messageId == 0) {
- messageId = messageIdSequence.next();
- msg->setPersistenceId(messageId);
- newId = true;
- }
- store(&queue, txn, key, msg, newId);
+ bool newId = false;
+ if (messageId == 0) {
+ messageId = messageIdSequence.next();
+ msg->setPersistenceId(messageId);
+ newId = true;
+ }
+ store(&queue, txn, key, msg, newId);
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-
- if (!ctxt) txn->commit();
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
void BdbMessageStore::store(const PersistableQueue* queue,
@@ -1056,25 +1068,13 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
-
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
-
- if (!ctxt) txn->commit();
-
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ msg->dequeueComplete();
}
void BdbMessageStore::async_dequeue(
@@ -1145,17 +1145,15 @@
void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env);
-
try {
+ // Nothing to do if not prepared
+ chkInitPreparedXidStore();
+ if (txn.getDtok().is_enqueued())
+ preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
- StringDbt key(txn.getXid());
- prepareXidDb.del(txn.get(), &key, 0);
-
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
- txn.abort();
throw;
}
}
@@ -1163,9 +1161,8 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1175,9 +1172,8 @@
IdSequence* jtx = NULL;
jtx = &messageIdSequence;
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1188,18 +1184,14 @@
if(!txn) throw InvalidTransactionContextException();
try {
- u_int8_t dummy(1);
- string xid(txn->getXid());
- Dbt key ((void*) xid.data(), xid.length());
- Dbt value(&dummy, sizeof(dummy));
+ chkInitPreparedXidStore();
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+ txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
- prepareXidDb.put(txn->get(), &key, &value, 0);
-
- txn->commit();
} catch (const std::exception& e) {
- txn->abort();
+ QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
throw;
}
}
@@ -1211,7 +1203,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
- txn->commit();
+ txn->complete(true);
}
}
@@ -1222,7 +1214,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
- txn->abort();
+ txn->complete(false);
}
}
@@ -1328,10 +1320,24 @@
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
- dir << storeDir<< "/rhm/jrnl/" ;
+ dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
+string BdbMessageStore::getBdbBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/dat/" ;
+ return dir.str();
+}
+
+string BdbMessageStore::getPxidBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/pxid/" ;
+ return dir.str();
+}
+
string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
@@ -1372,5 +1378,3 @@
"Lower values decrease latency at the expense of throughput.")
;
}
-
-
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -24,26 +24,30 @@
#ifndef _BdbMessageStore_
#define _BdbMessageStore_
+#include <string>
#include "db-inc.h"
-#include "BufferValue.h"
+//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-#include "IdSequence.h"
+//#include "IdSequence.h"
#include "PreparedTransaction.h"
-#include "StoreException.h"
+//#include "StoreException.h"
#include "TxnCtxt.h"
-#include <qpid/broker/Broker.h>
-#include <qpid/broker/MessageStore.h>
-#include <qpid/management/Manageable.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/sys/Time.h>
-#include <map>
-#include <set>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/ptr_container/ptr_list.hpp>
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/management/Manageable.h"
+//#include <qpid/sys/Monitor.h>
+//#include <qpid/sys/Time.h>
+//#include <map>
+//#include <set>
+//#include <iostream>
+//#include <boost/format.hpp>
+//#include <boost/intrusive_ptr.hpp>
+//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
+#include "jrnl/jcfg.hpp"
+#include "JournalImpl.h"
+#include "IdSequence.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -68,9 +72,12 @@
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Default store settings
- static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
- static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defXidStoreNumJrnlFiles = 8;
+ static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+ static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
std::list<Db*> dbs;
DbEnv env;
@@ -81,9 +88,7 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
- Db enqueueXidDb;
- Db dequeueXidDb;
- Db prepareXidDb;
+ boost::shared_ptr<JournalImpl> preparedXidStorePtr;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -150,9 +155,12 @@
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
string getJrnlBaseDir();
+ string getBdbBaseDir();
+ string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
+ void chkInitPreparedXidStore();
public:
struct Options : public qpid::Options {
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -152,7 +152,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
{
@@ -162,34 +162,42 @@
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
- // Create list of prepared xids
- std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- prep_xid_list.push_back(i->xid);
+
+ if (prep_tx_list_ptr) {
+ // Create list of prepared xids
+ std::vector<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
+ i != prep_tx_list_ptr->end(); i++) {
+ prep_xid_list.push_back(i->xid);
+ }
+
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+ &prep_xid_list, highest_rid);
+ } else {
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+ 0, highest_rid);
}
-
- jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
- prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- try {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
+ if (prep_tx_list_ptr)
+ {
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+ try {
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
+ }
}
}
+ catch (const jexception& e) {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
- catch (const jexception& e) {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
}
std::ostringstream oss2;
oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -119,7 +119,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id);
@@ -127,11 +127,11 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id) {
recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, 0,
- &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+ &aio_wr_callback, prep_tx_list_ptr, highest_rid, queue_id);
}
void recover_complete();
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -24,38 +24,40 @@
#ifndef _TxnCtxt_
#define _TxnCtxt_
-#include "db-inc.h"
-#include <qpid/broker/MessageStore.h>
-#include <qpid/sys/Mutex.h>
-#include <boost/shared_ptr.hpp>
-#include <sstream>
-#include <memory>
-#include <vector>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <jrnl/jexception.hpp>
+#include <db-inc.h>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <unistd.h> // ::usleep()
+#include "DataTokenImpl.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jexception.hpp"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "StoreException.h"
+
namespace rhm{
namespace bdbstore{
-// find a better place to put these
-#define MAX_AIO_SLEEPS 1000
-#define AIO_SLEEP_TIME 1000
-
-
class TxnCtxt : public qpid::broker::TransactionContext
{
protected:
+
+ static qpid::sys::Mutex globalSerialiser;
+
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
- static qpid::sys::Mutex globalSerialiser;
-
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
+ DataTokenImpl dtok;
AutoScopedLock globalHolder;
/**
@@ -63,67 +65,70 @@
*/
std::string tid;
DbTxn* txn;
-
- void completeTXN(bool commit){
+
+ void completeTXN(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->set_external_rid(true);
dtokp->set_rid(loggedtx->next());
- try{
+ try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
jc->flush(true);
} else {
jc->txn_abort(dtokp.get(), getXid());
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
//std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
-
}
- }
+ }
deleteXidRecord();
}
-
+
public:
-
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
- if (loggedtx){
- std::stringstream s;
- s << "rhm-tid" << this;
- tid.assign(s.str());
- }
+
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ if (loggedtx) {
+ std::stringstream s;
+ s << "rhm-tid" << this;
+ tid.assign(s.str());
+ }
}
-
+
/**
* Call to make sure all the data for this txn is written to safe store
*
*@return if the data sucessfully synced.
- */
- void sync(){
+ */
+
+ virtual ~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 1000 // ~1 second
+#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+
+ void sync() {
bool allWritten = false;
bool firstloop = true;
- while (loggedtx && !allWritten){
- if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
+ long sleep_cnt = 0L;
+ while (loggedtx && !allWritten) {
+ if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
+ if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
-
- try
- {
- if (jc && !(jc->is_txn_synced(getXid())))
- {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
}
@@ -131,34 +136,36 @@
firstloop = false;
}
}
-
- virtual ~TxnCtxt() { if(txn) abort(); }
- void begin(DbEnv& env, bool sync = false){
- env.txn_begin(0, &txn, 0);
- if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+
+ void begin(DbEnv& env, bool sync = false) {
+ env.txn_begin(0, &txn, 0);
+ if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
}
- void commit(){
- txn->commit(0);
- txn = 0;
- completeTXN(true);
- globalHolder.reset();
+
+ void commit() {
+ if (txn) {
+ txn->commit(0);
+ txn = 0;
+ globalHolder.reset();
+ }
}
- void abort(){
+
+ void abort(){
if (txn) {
- txn->abort();
- txn = 0;
- completeTXN(false);
- globalHolder.reset();
- }
+ txn->abort();
+ txn = 0;
+ globalHolder.reset();
+ }
}
- DbTxn* get(){ return txn; }
+
+ DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
- impactedQueues.insert(queue); }
-
+ void deleteXidRecord() { impactedQueues.clear(); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ void complete(bool commit) { completeTXN(commit); }
+ DataTokenImpl& getDtok() { return dtok; }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -168,12 +175,6 @@
TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
- // commit the BDB abort, abort commit the jnrl
- void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
- void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
- void complete(bool commit){
- txn->commit(0); completeTXN(commit); txn = 0;
- }
};
}}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -155,7 +155,7 @@
jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid)
{
_init_flag = false;
_stop_flag = false;
@@ -187,7 +187,7 @@
_jdir.verify_dir();
_rcvdat.reset(_num_jfiles);
- rcvr_janalyze(_rcvdat, prep_txn_list);
+ rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
@@ -574,7 +574,7 @@
}
void
-jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
@@ -633,16 +633,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- // Remove all transactions not in prep_txn_list
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
+ if (!rd._empty && prep_txn_list_ptr)
{
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+ if (pitr == prep_txn_list_ptr->end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -210,7 +210,7 @@
* \param wcache_pgsize_sblks The size in sblks of each write cache page.
* \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
- * \param prep_txn_list
+ * \param prep_txn_list_ptr
* \param highest_rid Returns the highest rid found in the journal during recover
*
* \exception TODO
@@ -218,7 +218,7 @@
void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal operation
@@ -575,6 +575,7 @@
* <b><i>false</i></b> otherwise.
*/
inline bool is_ready() const { return _init_flag and not _stop_flag; }
+ inline bool is_init() const { return _init_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -599,6 +600,9 @@
inline u_int16_t num_jfiles() const { return _num_jfiles; }
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
+ inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+ void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
@@ -644,7 +648,7 @@
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
+ void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -967,8 +967,8 @@
else
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " index=" << _pg_index << " state=";
- oss << _page_cb_arr[_pg_index].state_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str();
throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check");
}
}
@@ -988,8 +988,8 @@
if (!dtokp->is_writable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
@@ -999,8 +999,8 @@
if (!dtokp->is_dequeueable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -160,6 +160,9 @@
swap.check(commit);
restart();
swap.check(commit);
+
+ // this test leaves xids in the store
+ store->truncate();
}
void commit(Strategy& strategy)
@@ -294,52 +297,46 @@
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
- void testCommitSwap()
+ void testCommitEnqueue()
{
- Swap swap(this, "SwapMessageId");
- commit(swap);
+ Enqueue enqueue(this);
+ commit(enqueue);
}
- void testPrepareAndAbortSwap()
+ void testCommitDequeue()
{
- Swap swap(this, "SwapMessageId");
- abort(swap, true);
+ Dequeue dequeue(this);
+ commit(dequeue);
}
- void testAbortNoPrepareSwap()
+ void testCommitSwap()
{
Swap swap(this, "SwapMessageId");
- abort(swap, false);
+ commit(swap);
}
- void testCommitEnqueue()
- {
- Enqueue enqueue(this);
- commit(enqueue);
- }
-
void testPrepareAndAbortEnqueue()
{
Enqueue enqueue(this);
abort(enqueue, true);
}
- void testAbortNoPrepareEnqueue()
+ void testPrepareAndAbortDequeue()
{
- Enqueue enqueue(this);
- abort(enqueue, false);
+ Dequeue dequeue(this);
+ abort(dequeue, true);
}
- void testCommitDequeue()
+ void testPrepareAndAbortSwap()
{
- Dequeue dequeue(this);
- commit(dequeue);
+ Swap swap(this, "SwapMessageId");
+ abort(swap, true);
}
- void testPrepareAndAbortDequeue()
+ void testAbortNoPrepareEnqueue()
{
- Dequeue dequeue(this);
- abort(dequeue, true);
+ Enqueue enqueue(this);
+ abort(enqueue, false);
}
void testAbortNoPrepareDequeue()
@@ -348,6 +345,12 @@
abort(dequeue, false);
}
+ void testAbortNoPrepareSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ abort(swap, false);
+ }
+
void testRecoverPreparedThenCommitted()
{
recoverPrepared(true);
@@ -363,73 +366,73 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
{
- cout << test_filename << ".PrepareAndAbortSwap: " << flush;
- tpct.testPrepareAndAbortSwap();
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitEnqueue)
+QPID_AUTO_TEST_CASE(CommitDequeue)
{
- cout << test_filename << ".CommitEnqueue: " << flush;
- tpct.testCommitEnqueue();
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+QPID_AUTO_TEST_CASE(CommitSwap)
{
- cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
- tpct.testAbortNoPrepareEnqueue();
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
{
- cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
- tpct.testPrepareAndAbortDequeue();
+ cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+ tpct.testPrepareAndAbortEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
{
- cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
- tpct.testRecoverPreparedThenCommitted();
+ cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+ tpct.testPrepareAndAbortDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitSwap)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
{
- cout << test_filename << ".CommitSwap: " << flush;
- tpct.testCommitSwap();
+ cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+ tpct.testPrepareAndAbortSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
{
- cout << test_filename << ".AbortNoPrepareSwap: " << flush;
- tpct.testAbortNoPrepareSwap();
+ cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+ tpct.testAbortNoPrepareEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
{
- cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
- tpct.testPrepareAndAbortEnqueue();
+ cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+ tpct.testAbortNoPrepareDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitDequeue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
{
- cout << test_filename << ".CommitDequeue: " << flush;
- tpct.testCommitDequeue();
+ cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+ tpct.testAbortNoPrepareSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
{
- cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
- tpct.testAbortNoPrepareDequeue();
+ cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+ tpct.testRecoverPreparedThenCommitted();
cout << "ok" << endl;
}
Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -144,25 +144,23 @@
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
@@ -189,11 +187,10 @@
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -210,7 +207,6 @@
try
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
for (int m=0; m<2*NUM_MSGS; m+=2)
@@ -221,7 +217,7 @@
jc.initialize(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS); // First time only
else
{
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m - 1));
jc.recover_complete();
}
@@ -229,7 +225,7 @@
}
{
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m));
jc.recover_complete();
deq_msg(jc, m);
@@ -265,11 +261,10 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -70,7 +70,7 @@
void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks)
{ jcntl::initialize(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
0, &aio_wr_callback); }
- void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>& txn_list,
+ void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>* txn_list,
u_int64_t& highest_rid)
{ jcntl::recover(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, 0,
&aio_wr_callback, txn_list, highest_rid); }
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -144,7 +144,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -152,7 +151,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -187,7 +186,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -195,7 +193,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
@@ -209,7 +207,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -217,7 +214,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -110,11 +110,10 @@
{
try
{
- std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
_jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback,
- prep_txn_list, highest_rid);
+ 0, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)
17 years, 10 months