Author: cctrieloff
Date: 2007-09-14 14:45:18 -0400 (Fri, 14 Sep 2007)
New Revision: 924
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
more async journal integration - default still set to sync
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-14 18:21:12 UTC (rev 923)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-14 18:45:18 UTC (rev 924)
@@ -44,6 +44,9 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
+// cct delete this !!!!
+// bool hack = false;
+
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
exchangeDb(&env, 0),
@@ -109,7 +112,18 @@
(*i)->truncate(txn, &count, 0);
}
- txn->commit(0);
+ txn->commit(0);
+
+ if (usingJrnl())
+ {
+ try{
+ journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ }
+ catch ( journal::jexception& e) {
+ std::string str;
+ THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str)
);
+ }
+ }
}
void BdbMessageStore::create(PersistableQueue& queue)
@@ -149,12 +163,11 @@
{
destroy(queueDb, queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
- journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
- if (jQueue)
+ if (eqs)
{
+ journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
jQueue->delete_jrnl();
- delete(jQueue);
- queue.setExternalQueueStore(NULL);
+ queue.setExternalQueueStore(NULL); // will delete the journal if exists
}
}
@@ -292,6 +305,25 @@
RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
+
+ const char* queueName = queue->getName().c_str();
+
+ if (usingJrnl())
+ {
+
+ journal::jcntl* jQueue = new journal::jcntl(queueName,
getJrnlDir(queueName), string("JournalData"));
+
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+
+ try
+ {
+ jQueue->recover();
+ } catch (journal::jexception& e) {
+ std::string s;
+ THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
+ }
+ }
+
+
index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
@@ -500,7 +532,7 @@
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
- store(NULL, txn.get(), key, msg);
+ store(NULL, txn.get(), key, msg, true);
msg.setPersistenceId(messageId);
txn.commit();
} catch (std::exception& e) {
@@ -618,17 +650,19 @@
}
try {
+
+ bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
- store(&queue, txn->get(), key, msg);
msg.setPersistenceId(messageId);
- }
- if (!usingJrnl()) {
- msg.enqueueComplete(); // set enqueued for ack
- }
-
- if (!usingJrnl())
+ newId = true;
+ }
+ store(&queue, txn->get(), key, msg, newId);
+
+ if (/*!hack || */ !usingJrnl()){
+ msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
+ }
// cct if using Journal do we need to wait for IO to complete before calling thus???
// set enqueue comple on callback msg.enqueueComplete();
@@ -643,7 +677,10 @@
}
}
-void BdbMessageStore::store(const PersistableQueue* queue, DbTxn* txn, Dbt&
messageId, PersistableMessage& message)
+void BdbMessageStore::store(const PersistableQueue* queue,
+ DbTxn* txn, Dbt& messageId,
+ PersistableMessage& message,
+ bool newId)
{
u_int32_t headerSize = message.encodedHeaderSize();
u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
@@ -652,11 +689,13 @@
buffer.putLong(headerSize);
message.encode(buffer);
//buffer.flip();
-
+
+
try {
- if (queue && usingJrnl()){
-
+ if (/*hack &&*/ queue && usingJrnl()){
+
+ if (queue){
// cct TODO -- delete this in the callback...
journal::data_tok* dtokp = new journal::data_tok();
dtokp->setSourceMessage (&message);
@@ -667,11 +706,14 @@
while (!written)
{
journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
+ char text[4];
+ strcpy(text,"123");
+
+ rhm::journal::iores eres = jc->enqueue_data(&text, 3, dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
- if (dtokp->wstate() >= rhm::journal::data_tok::ENQ_SUBM)
+ if (dtokp->wstate() >= rhm::journal::data_tok::ENQ_SUBM)
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -688,13 +730,20 @@
assert( "Unexpected msg state");
}
}
+ }
+
} else {
/// cct message db
- Dbt data(buff,size);
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ if (newId){ // only store in Bd if first time message is stored
+ Dbt data(buff,size);
+ messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ }
}
-
- } catch (DbException& e) {
+ }catch ( journal::jexception& e) {
+ std::string str;
+ std::cout << "----" << e.to_string(str) << std::endl;
+ THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+ }catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -930,17 +979,32 @@
}
}
+string BdbMessageStore::getJrnlBaseDir()
+{
+ std::stringstream dir;
+ dir << "/var/rhm/" ;
+ return dir.str();
+}
+
string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for
exmaple /var/rhm/ + queueDir/
{
+ return getJrnlDir(queue.getName().c_str());
+}
+
+string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ +
queueDir/
+{
std::stringstream dir;
- dir << "/var/rhm/" ;
+ dir << getJrnlBaseDir();
dir << std::setw(4);
dir << std::setfill('0');
- dir << (atol(queue.getName().c_str())%20);
- dir << "/" << queue.getName() << "/";
+// const char* str = queueName; //queue.getName().c_str();
+ u_int32_t count = 0;
+ for (u_int32_t i=0; i < strlen(queueName); i++)
+ count += queueName[i];
+
+ dir << (count%20);
+ dir << "/" << queueName << "/";
return dir.str();
}
-
-
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-14 18:21:12 UTC (rev 923)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-14 18:45:18 UTC (rev 924)
@@ -85,7 +85,10 @@
void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn, Dbt&
messageId, qpid::broker::PersistableMessage& message);
+ void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn,
+ Dbt& messageId,
+ qpid::broker::PersistableMessage& message,
+ bool newId);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
@@ -104,7 +107,9 @@
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
+ string getJrnlDir(const char* queueName);
inline bool usingJrnl() {return false;} // make configurable
+ string getJrnlBaseDir();
public:
BdbMessageStore(const char* envpath = 0);