[rhmessaging-commits] rhmessaging commits: r2192 - in store/branches/mrg-1.0/cpp/lib: gen/qpid/management and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 14 09:05:07 EDT 2008


Author: kpvdr
Date: 2008-07-14 09:05:07 -0400 (Mon, 14 Jul 2008)
New Revision: 2192

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp
   store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h
Log:
Backport of trunk r.2171: Cleaned up BdbMessageStore, removed unneeded BDB sync storage.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 13:05:07 UTC (rev 2192)
@@ -51,7 +51,6 @@
 
 static const u_int8_t MESSAGE_MESSAGE = 1;
 static const u_int8_t BASIC_MESSAGE = 2;
-bool BdbMessageStore::useAsync;
 qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms
 qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -88,7 +87,6 @@
             mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (this, broker));
 
             mgmtObject->set_location(storeDir);
-            mgmtObject->set_async(useAsync);
             mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
             mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
 
@@ -125,15 +123,12 @@
             wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
     }
     
-    // TODO: remove
-    useAsync = true;
     if (dir.size()>0) storeDir = dir;
 
     string bdbdir = storeDir + "/rhm/dat/";
     journal::jdir::create_dir(bdbdir);
 
 
-    bool ret = false;
     try {
         env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
     } catch (const DbException& e) {
@@ -167,11 +162,6 @@
         throw;
     }
     
-    // TODO: remove
-    bool force = false;
-    ret = mode(useAsync, force);
-    if (!ret) return false;
-
     isInit = true;
     QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
     return true;
@@ -249,54 +239,6 @@
     return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
 }
 
-// true is async
-bool BdbMessageStore::mode(const bool async, const bool force)
-{
-
-    u_int32_t id (1); // key one in config is mode
-    Dbt key(&id, sizeof(id));
-    size_t preamble_length = sizeof(u_int32_t);
-    BufferValue value(preamble_length, 0);
-    u_int32_t avalue = async ? 1 : 2;
-    value.buffer.putLong( avalue );
-    bool same = false;
-    bool hasMode = false;
-    
-    {	
-        Cursor config;
-        config.open(configDb, 0);
-        IdDbt rkey;
-        BufferValue rvalue(preamble_length, 0);
-        rvalue.buffer.record();
-
-        while (config.next(rkey, rvalue)) {
-            if (rkey.id == 1)
-            {
-                hasMode = true;
-                u_int32_t valueL = rvalue.buffer.getLong();
-                if (avalue == valueL){
-                    same = true;
-                }else {
-                    break;
-                }
-            }
-        }
-    }
-    if (same) return true;
-    if (!same && !force && hasMode) return false; 
-    if (!same && force && hasMode) {
-        truncate();
-    }
-	
-    int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
-    if (status == DB_KEYEXIST) {
-        return false;
-    } else {
-        return true;
-    }
-    return false;
-}
-
 void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
 {
     if(dupKey) db.set_flags(DB_DUPSORT);
@@ -346,36 +288,34 @@
     if (queue.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
-    if (usingJrnl()) {
-        JournalImpl* jQueue = 0;
-        FieldTable::ValuePtr value;
+    JournalImpl* jQueue = 0;
+    FieldTable::ValuePtr value;
 
-        uint16_t localFileCount = numJrnlFiles;
-        uint32_t localFileSize  = jrnlFsizePgs;
+    uint16_t localFileCount = numJrnlFiles;
+    uint32_t localFileSize  = jrnlFsizePgs;
 
-        value = args.get ("qpid.file_count");
-        if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-            localFileCount = (uint16_t) value->get<int>();
+    value = args.get ("qpid.file_count");
+    if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+        localFileCount = (uint16_t) value->get<int>();
 
-        value = args.get ("qpid.file_size");
-        if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-            localFileSize = (uint32_t) value->get<int>();
+    value = args.get ("qpid.file_size");
+    if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+        localFileSize = (uint32_t) value->get<int>();
 
-        {
-            qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
-                                     string("JournalData"), defJournalGetEventsTimeout,
-                                     defJournalFlushTimeout);
-        }
-        queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-        try	{
-            // init will create the deque's for the init...
-            jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
-        } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
-                                  ": create() failed: " + e.what());
-        }
+    {
+        qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+        jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+                                 string("JournalData"), defJournalGetEventsTimeout,
+                                 defJournalFlushTimeout);
     }
+    queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+    try	{
+        // init will create the deque's for the init...
+        jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+    } catch (const journal::jexception& e) {
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+                              ": create() failed: " + e.what());
+    }
 
     try {
         if (!create(queueDb, queueIdSequence, queue)) {
@@ -557,38 +497,32 @@
         //set the persistenceId and update max as required
         queue->setPersistenceId(key.id);
 	
-        if (usingJrnl())
+        const char* queueName = queue->getName().c_str();
+        JournalImpl* jQueue = 0;
         {
-            const char* queueName = queue->getName().c_str();
-            JournalImpl* jQueue = 0;
-            {
-                qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-                jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
-            }
-            queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+            qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+        }
+        queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 	
-            try
-            {
-                u_int64_t thisHighestRid = 0;
-                jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
-                if (thisHighestRid > highestRid)
-                    highestRid = thisHighestRid;
-                recoverMessages(txn, registry, queue, prepared, messages); 
-                jQueue->recover_complete(); // start journal.
-            } catch (const journal::jexception& e) {
-                THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
-            }
-            //read all messages: done on a per queue basis if using Journal
+        try
+        {
+            u_int64_t thisHighestRid = 0;
+            jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+            if (thisHighestRid > highestRid)
+                highestRid = thisHighestRid;
+            recoverMessages(txn, registry, queue, prepared, messages); 
+            jQueue->recover_complete(); // start journal.
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
         }
+        //read all messages: done on a per queue basis if using Journal
 
         queue_index[key.id] = queue;
         maxQueueId = max(key.id, maxQueueId);
     }
     messageIdSequence.reset(highestRid + 1);
     queueIdSequence.reset(maxQueueId + 1);
-	
-    if (!usingJrnl()) //read all messages: 
-        recoverMessages(txn, registry, queue_index, prepared, messages);
 }
 
 
@@ -667,7 +601,6 @@
     generalIdSequence.reset(maxGeneralId + 1);
 }
 
-// async IO version.
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
                                       qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
 {
@@ -784,69 +717,7 @@
 } 
 
 
-// bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery, queue_index& index,
-                                      txn_list& locked, message_index& prepared)
-{
-    //have to create a new txn here, and commit in batches to avoid
-    //problems with large message databases
-    TxnCtxt txn;
-    txn.begin(env);
 
-    Cursor messages;
-    messages.open(messageDb, txn.get());
-
-    IdDbt key;
-    size_t preamble_length = sizeof(u_int32_t)/*header size*/;
-    u_int64_t maxMessageId(1);
-
-    BufferValue value(preamble_length, 0);
-    value.buffer.record();
-    uint count(0);
-    while (messages.next(key, value)) {
-        if (++count % 1000 == 0) {
-            QPID_LOG(debug, "Recovering " << count << "th message...");
-            //reset cursor && txn:
-            messages.close();
-            txn.commit();
-            txn.begin(env);
-            messages.open(messageDb, txn.get());
-            messages->get(&key, &value, DB_SET);
-        }
-        //read header only to begin with
-        u_int32_t headerSize = value.buffer.getLong();
-        value.buffer.restore();
-
-        BufferValue header(headerSize, preamble_length);
-        messages.current(key, header);
-        
-        RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
-        msg->setPersistenceId(key.id);
-
-        u_int32_t contentOffset = headerSize + preamble_length;
-        u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) - contentOffset;
-        if (msg->loadContent(contentSize)) {
-            //now read the content
-            BufferValue content(contentSize, contentOffset);
-            messages.current(key, content);
-            msg->decodeContent(content.buffer);
-        }
-
-        //find all the queues into which this message has been enqueued
-        if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
-            //message not referenced anywhere - can delete
-            messages->del(0);
-        } else {
-            if (key.id > maxMessageId) {
-                maxMessageId = key.id;
-            }
-        }
-    }
-    messages.close();
-    txn.commit();
-    messageIdSequence.reset(maxMessageId + 1);
-}
-
 int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg, 
                                     queue_index& index, txn_list& locked, 
                                     message_index& prepared)
@@ -881,34 +752,12 @@
     collectPreparedXids(prepared);
 
     //when using the async journal, it will abort unprepaired xids and populate the locked maps
-    if (!usingJrnl()){
-        txn_lock_map enqueues;
-        txn_lock_map dequeues;
-        std::set<string> known;
-    	readXids(enqueueXidDb, known);
-    	readXids(dequeueXidDb, known);
-
-    	//abort all known but unprepared xids:
-    	for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
-            if (prepared.find(*i) == prepared.end()) {
-            	TPCTxnCtxt txn(*i, NULL);
-                completed(txn, dequeueXidDb, enqueueXidDb, false);
-            }
-    	}
-        readLockedMappings(enqueueXidDb, enqueues);
-        readLockedMappings(dequeueXidDb, dequeues);
-        for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
-            txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
-        }
-    } else {
-        for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
-            LockedMappings::shared_ptr enq_ptr;
-            enq_ptr.reset(new LockedMappings);
-            LockedMappings::shared_ptr deq_ptr;
-            deq_ptr.reset(new LockedMappings);
-            txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
-        }
-        
+    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+        LockedMappings::shared_ptr enq_ptr;
+        enq_ptr.reset(new LockedMappings);
+        LockedMappings::shared_ptr deq_ptr;
+        deq_ptr.reset(new LockedMappings);
+        txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
     }
 }
 
@@ -1081,7 +930,6 @@
 
 void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
 {
-    if (!usingJrnl()) return;
     if (queue.getExternalQueueStore() == 0) return;
     checkInit();
     std::string qn = queue.getName();
@@ -1106,7 +954,6 @@
         THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
     }
     Dbt key (&messageId, sizeof(messageId));
-    Dbt value (&queueId, sizeof(queueId));
 
     TxnCtxt implicit;
     TxnCtxt* txn = 0;
@@ -1114,11 +961,10 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env, !usingJrnl());
+        txn->begin(env);
     }
 
     try {
-    
         bool newId = false;
         if (messageId == 0) {
             messageId = messageIdSequence.next();
@@ -1127,20 +973,9 @@
         }
         store(&queue, txn, key, msg, newId);
 
-        if (usingJrnl()){
-            // add queue* to the txn map..
-            if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-        }else{
-       	    msg->enqueueComplete();  // set enqueued for ack
-            put(mappingDb, txn->get(), key, value);
+        // add queue* to the txn map..
+        if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
         
-            // cct if using Journal do we need to wait for IO to complete before calling thus???
-            // set enqueue comple on callback msg.enqueueComplete();
-            if (txn->isTPC()) {
-                record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
-            }
-        }
-        
         if (!ctxt) txn->commit();
     } catch (const std::exception& e) {
         if (!ctxt) txn->abort();
@@ -1166,7 +1001,7 @@
 
     try {
 
-        if ( queue && usingJrnl()) {
+        if ( queue ) {
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
             dtokp->addRef();
 	        dtokp->setSourceMessage(message);
@@ -1197,8 +1032,6 @@
     } catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
                               e.what());
-    } catch (const DbException& e) {
-        THROW_STORE_EXCEPTION_2("Error storing message", e);
     }
 }
 
@@ -1221,37 +1054,21 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env, !usingJrnl());
+        txn->begin(env);
     }
     
     try {
         
-        if (usingJrnl()){
-            // add queue* to the txn map..
-            if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-            async_dequeue(ctxt, msg, queue); 
+        // add queue* to the txn map..
+        if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+        async_dequeue(ctxt, msg, queue); 
 			
-            msg->dequeueComplete();
-            // 		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
-            // 		         msg->setPersistenceId(0);
+        msg->dequeueComplete();
+        // 		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
+        // 		         msg->setPersistenceId(0);
 			
-        } else if (txn->isTPC()) {
-            //if this is part of a 2pc transaction, then only record the dequeue now,
-            //it will be applied on commit
-            record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
-        } else {
-            Dbt key (&messageId, sizeof(messageId));
-            Dbt value (&queueId, sizeof(queueId));
-            if (dequeue(txn->get(), key, value)) {
-                msg->setPersistenceId(0);//clear id as we have now removed the message from the store
-                msg->dequeueComplete(); // set dequeued for ack
-            }
-        }
         if (!ctxt) txn->commit();
         
-    } catch (const DbException& e) {
-        if (!ctxt) txn->abort();
-        THROW_STORE_EXCEPTION_2("Error dequeing message", e);
     } catch (const std::exception& e) {
         if (!ctxt) txn->abort();
         throw;
@@ -1287,49 +1104,6 @@
     }
 }
 
-bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
-{
-    //First look up the message, this gets a lock on that table in
-    //case we need to delete it (avoiding deadlocks with enqueue where
-    //the locking order is messageDb then mappingDb)
-    Cursor msgCursor;
-    msgCursor.open(messageDb, txn);
-
-    try {
-        Dbt peek;
-        peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-        peek.set_ulen(0);
-        int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
-        if (status == DB_NOTFOUND ) {
-            THROW_STORE_EXCEPTION("Can't find record for message");
-        } else if (status != 0 && status != DB_BUFFER_SMALL) {
-            string e = "Dequeue failed (while seeking message) with unexpected status = ";
-            e += DbEnv::strerror(status);
-            THROW_STORE_EXCEPTION(e);
-        }
-    } catch (DbMemoryException& expected) {
-    }
-
-    Cursor cursor;
-    cursor.open(mappingDb, txn);
-
-    int status = cursor->get(&messageId, &queueId, DB_GET_BOTH | DB_RMW);
-    if (status == 0) {
-        cursor->del(0);
-    } else if (status == DB_NOTFOUND ) {
-        THROW_STORE_EXCEPTION("Can't find record mapping message to queue");
-    } else {
-        THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
-    }
-
-    if (isUnused(cursor, messageId)) {
-        msgCursor->del(0);
-        return true;
-    } else {
-        return false;
-    }
-}
-
 u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
 {
     checkInit();
@@ -1367,29 +1141,13 @@
     }
 }
 
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
 {
-    if (!txn.get()) txn.begin(env, !usingJrnl());
+    if (!txn.get()) txn.begin(env);
 
     try {
 
         StringDbt key(txn.getXid());
-        if (!usingJrnl()){
-            //scroll through all records matching xid in apply and dequeue
-            //using the message and queue id encoded in each value
-            Cursor c;
-            c.open(apply, txn.get());
-            IdPairDbt value;
-            
-            for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
-            	dequeue(txn.get(), value.message, value.queue);
-            }
-            c.close();
-            
-            //delete all records matching xid
-            discard.del(txn.get(), &key, 0);
-            apply.del(txn.get(), &key, 0);
-        }
         prepareXidDb.del(txn.get(), &key, 0);
 
         txn.complete(commit);
@@ -1405,7 +1163,7 @@
     checkInit();
     // pass sequence number for c/a when using jrnl
     TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
-    txn->begin(env, !usingJrnl());
+    txn->begin(env);
     return auto_ptr<TransactionContext>(txn);
 }
 
@@ -1413,11 +1171,11 @@
 {
     checkInit();
     IdSequence* jtx = NULL;
-    if (usingJrnl()) jtx = &messageIdSequence;
+    jtx = &messageIdSequence;
 
     // pass sequence number for c/a when using jrnl
     TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
-    txn->begin(env, !usingJrnl());
+    txn->begin(env);
     return auto_ptr<TPCTransactionContext>(txn);
 }
 
@@ -1449,7 +1207,7 @@
     checkInit();
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);        
+        completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);        
     } else {
         txn->commit();
     }
@@ -1460,7 +1218,7 @@
     checkInit();
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
+        completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
     } else {
         txn->abort();
     }
@@ -1487,14 +1245,6 @@
     }
 }
 
-void BdbMessageStore::record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId)
-{
-    StringDbt key(txn.getXid());
-    IdPairDbt value(queueId, messageId);
-    put(db, txn.get(), key, value);
-}
-
-
  
 bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
 {
@@ -1607,7 +1357,7 @@
     wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
 {
     addOptions()
-        ("store-directory", qpid::optValue(storeDir, "DIR"),
+        ("store-dir", qpid::optValue(storeDir, "DIR"),
          "Store directory location for persistence (instead of using --data-dir value). "
          "Must be supplied if --no-data-dir is also used.")
         ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 13:05:07 UTC (rev 2192)
@@ -88,7 +88,6 @@
             IdSequence exchangeIdSequence;
             IdSequence generalIdSequence;
             IdSequence messageIdSequence;
-			static bool useAsync;
 			std::string storeDir;
             u_int16_t numJrnlFiles;
             u_int32_t jrnlFsizePgs;
@@ -124,8 +123,6 @@
 	    	        Dbt& messageId, 
 			        boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
 			        bool newId);
-            void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
-            bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
             void async_dequeue(qpid::broker::TransactionContext* ctxt, 
 	    	        boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
 			        const qpid::broker::PersistableQueue& queue);
@@ -134,7 +131,7 @@
             bool isUnused(Cursor& cursor, Dbt& messageId);
             void destroy(Db& db, const qpid::broker::Persistable& p);
             bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
-            void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+            void completed(TPCTxnCtxt& txn, bool commit);
             void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
             void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
             void deleteBinding(const qpid::broker::PersistableExchange& exchange, 
@@ -152,7 +149,6 @@
 	  	  	void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
 	  	  	string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
 	  	  	string getJrnlDir(const char* queueName);
-	  	  	static inline bool usingJrnl() {return useAsync;} 
 	  	  	string getJrnlBaseDir(); 
 			inline void checkInit() {
                 if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;

Modified: store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp	2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.cpp	2008-07-14 13:05:07 UTC (rev 2192)
@@ -35,7 +35,7 @@
 string  Store::packageName  = string ("mrgstore");
 string  Store::className    = string ("store");
 uint8_t Store::md5Sum[16]   =
-    {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
+    {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
 
 Store::Store (Manageable* _core, Manageable* _parent) :
     ManagementObject(_core)
@@ -71,7 +71,7 @@
     buf.putShortString (packageName); // Package Name
     buf.putShortString (className);   // Class Name
     buf.putBin128      (md5Sum);      // Schema Hash
-    buf.putShort       (5); // Config Element Count
+    buf.putShort       (4); // Config Element Count
     buf.putShort       (0); // Inst Element Count
     buf.putShort       (0); // Method Count
     buf.putShort       (0); // Event Count
@@ -93,14 +93,6 @@
     buf.put (ft);
 
     ft = FieldTable ();
-    ft.setString (NAME,   "async");
-    ft.setInt    (TYPE,   TYPE_BOOL);
-    ft.setInt    (ACCESS, ACCESS_RO);
-    ft.setInt    (INDEX,  0);
-    ft.setString (DESC,   "Asynchronous IO");
-    buf.put (ft);
-
-    ft = FieldTable ();
     ft.setString (NAME,   "defaultInitialFileCount");
     ft.setInt    (TYPE,   TYPE_U16);
     ft.setInt    (ACCESS, ACCESS_RO);
@@ -135,7 +127,6 @@
     writeTimestamps (buf);
     buf.putLongLong (brokerRef);
     buf.putShortString (location);
-    buf.putOctet (async?1:0);
     buf.putShort (defaultInitialFileCount);
     buf.putLong (defaultDataFileSize);
 

Modified: store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h	2008-07-14 12:43:10 UTC (rev 2191)
+++ store/branches/mrg-1.0/cpp/lib/gen/qpid/management/Store.h	2008-07-14 13:05:07 UTC (rev 2192)
@@ -43,7 +43,6 @@
     // Properties
     uint64_t brokerRef;
     std::string location;
-    uint8_t async;
     uint16_t defaultInitialFileCount;
     uint32_t defaultDataFileSize;
 
@@ -89,11 +88,6 @@
         location = val;
         configChanged = true;
     }
-    inline void set_async (uint8_t val){
-        sys::Mutex::ScopedLock mutex(accessLock);
-        async = val;
-        configChanged = true;
-    }
     inline void set_defaultInitialFileCount (uint16_t val){
         sys::Mutex::ScopedLock mutex(accessLock);
         defaultInitialFileCount = val;




More information about the rhmessaging-commits mailing list