[rhmessaging-commits] rhmessaging commits: r1114 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Oct 18 21:34:08 EDT 2007
Author: cctrieloff
Date: 2007-10-18 21:34:08 -0400 (Thu, 18 Oct 2007)
New Revision: 1114
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
- add flush for async
- get config for dir + async from broker
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-18 17:19:49 UTC (rev 1113)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-19 01:34:08 UTC (rev 1114)
@@ -42,6 +42,7 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
+bool BdbMessageStore::useAsync;
unsigned int TxnCtxt::count = 0;
@@ -54,12 +55,23 @@
bindingDb(&env, 0),
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
- prepareXidDb(&env, 0)
-
+ prepareXidDb(&env, 0),
+ isInit(false),
+ envPath(envpath)
+
{
+
+
+}
+
+void BdbMessageStore::init(const std::string& dir, const bool async)
+{
+ useAsync = async;
+ if (dir.size()>0) storeDir = dir;
+
TxnCtxt txn;
try {
- env.open(envpath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.open(envPath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
txn.begin(env);
open(queueDb, txn.get(), "queues.db", false);
@@ -78,11 +90,13 @@
txn.abort();
throw e;
}
+
+ isInit = true;
}
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
- if(dupKey) db.set_flags(DB_DUPSORT);
+ if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
@@ -103,6 +117,7 @@
void BdbMessageStore::truncate()
{
+ checkInit();
DbTxn* txn;
env.txn_begin(0, &txn, 0);
u_int32_t count;
@@ -125,6 +140,7 @@
void BdbMessageStore::create(PersistableQueue& queue)
{
+ checkInit();
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
@@ -151,6 +167,7 @@
void BdbMessageStore::destroy(PersistableQueue& queue)
{
+ checkInit();
destroy(queueDb, queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs)
@@ -164,6 +181,7 @@
void BdbMessageStore::create(const PersistableExchange& exchange)
{
+ checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
@@ -179,6 +197,7 @@
void BdbMessageStore::destroy(const PersistableExchange& exchange)
{
+ checkInit();
destroy(exchangeDb, exchange);
//need to also delete bindings
IdDbt key(exchange.getPersistenceId());
@@ -210,6 +229,7 @@
void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const FieldTable& a)
{
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
@@ -221,6 +241,7 @@
void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const FieldTable& a)
{
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
@@ -237,6 +258,7 @@
void BdbMessageStore::recover(RecoveryManager& registry)
{
+ checkInit();
txn_list prepared;
recoverXids(prepared);
@@ -623,6 +645,7 @@
void BdbMessageStore::stage(PersistableMessage& msg)
{
+ checkInit();
TxnCtxt txn;
txn.begin(env);
@@ -642,6 +665,7 @@
}
void BdbMessageStore::destroy(PersistableMessage& msg)
{
+ checkInit();
u_int64_t messageId (msg.getPersistenceId());
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
@@ -675,6 +699,7 @@
void BdbMessageStore::appendContent(const PersistableMessage& msg, const std::string& data)
{
+ checkInit();
u_int64_t messageId (msg.getPersistenceId());
if (messageId != 0) {
try {
@@ -702,6 +727,7 @@
}
void BdbMessageStore::loadContent(const PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length)
{
+ checkInit();
u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg.encodedHeaderSize();
u_int64_t messageId (msg.getPersistenceId());
if (messageId != 0) {
@@ -729,8 +755,21 @@
}
}
+void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+{
+ checkInit();
+ try {
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (jc) jc->flush();
+ }catch ( journal::jexception& e) {
+ std::string str;
+ THROW_STORE_EXCEPTION("Flush failed: " + e.to_string(str) );
+ }
+}
+
void BdbMessageStore::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
{
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg.getPersistenceId());
if (queueId == 0) {
@@ -846,7 +885,7 @@
}
}catch ( journal::jexception& e) {
std::string str;
- std::cout << "-------------" << e << std::endl;
+// std::cout << "-------------" << e << std::endl;
if (dtokp) delete dtokp;
THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
}catch (DbException& e) {
@@ -856,6 +895,7 @@
void BdbMessageStore::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
{
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg.getPersistenceId());
if (messageId == 0) {
@@ -974,6 +1014,7 @@
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
+ checkInit();
return 0;
}
@@ -1034,6 +1075,7 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
+ checkInit();
// pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
txn->begin(env);
@@ -1042,6 +1084,7 @@
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
{
+ checkInit();
IdSequence* jtx = NULL;
if (usingJrnl()) jtx = &messageIdSequence;
@@ -1053,6 +1096,7 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
+ checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
@@ -1075,7 +1119,7 @@
void BdbMessageStore::commit(TransactionContext& ctxt)
{
-std::cout << " commit1" << std::flush;
+ checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
@@ -1086,6 +1130,7 @@
void BdbMessageStore::abort(TransactionContext& ctxt)
{
+ checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
@@ -1152,7 +1197,7 @@
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
- dir << "/var/rhm/" ;
+ dir << storeDir<< "/rhm/" ;
return dir.str();
}
@@ -1167,7 +1212,6 @@
dir << getJrnlBaseDir();
dir << std::setw(4);
dir << std::setfill('0');
-// const char* str = queueName; //queue.getName().c_str();
u_int32_t count = 0;
for (u_int32_t i=0; i < strlen(queueName); i++)
count += queueName[i];
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-18 17:19:49 UTC (rev 1113)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-19 01:34:08 UTC (rev 1114)
@@ -71,6 +71,10 @@
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence messageIdSequence;
+ static bool useAsync;
+ std::string storeDir;
+ bool isInit;
+ const char* envPath;
void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
txn_list& locked, message_index& messages);
@@ -113,12 +117,13 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return false;} // make configurable
+ static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
+ inline void checkInit() { if (!isInit) init("/var",false); isInit = true;}
-
public:
BdbMessageStore(const char* envpath = 0);
+ void init(const std::string& dir, const bool async);
~BdbMessageStore();
void truncate();
@@ -149,6 +154,7 @@
void dequeue(qpid::broker::TransactionContext* ctxt,
qpid::broker::PersistableMessage& msg,
const qpid::broker::PersistableQueue& queue);
+ void flush(const qpid::broker::PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
More information about the rhmessaging-commits
mailing list