[rhmessaging-commits] rhmessaging commits: r2037 - in store/trunk/cpp: tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon May 12 13:23:00 EDT 2008
Author: tedross
Date: 2008-05-12 13:23:00 -0400 (Mon, 12 May 2008)
New Revision: 2037
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Added store support for general configuration items (used for qpid federation)
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-12 17:23:00 UTC (rev 2037)
@@ -64,6 +64,7 @@
messageDb(&env, 0),
mappingDb(&env, 0),
bindingDb(&env, 0),
+ generalDb(&env, 0),
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
prepareXidDb(&env, 0),
@@ -149,6 +150,7 @@
open(messageDb, txn.get(), "messages.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
+ open(generalDb, txn.get(), "general.db", false);
open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
open(prepareXidDb, txn.get(), "prepare_xid.db", false);
@@ -416,6 +418,28 @@
bindingDb.del(0, &key, DB_AUTO_COMMIT);
}
+void BdbMessageStore::create(const PersistableConfig& general)
+{
+ checkInit();
+ if (general.getPersistenceId()) {
+ THROW_STORE_EXCEPTION("General configuration item already created");
+ }
+
+ try {
+ if (!create(generalDb, generalIdSequence, general)) {
+ THROW_STORE_EXCEPTION("General configuration already exists");
+ }
+ } catch (const DbException& e) {
+ THROW_STORE_EXCEPTION_2("Error creating general configuration", e);
+ }
+}
+
+void BdbMessageStore::destroy(const PersistableConfig& general)
+{
+ checkInit();
+ destroy(generalDb, general);
+}
+
bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
{
u_int64_t id (seq.next());
@@ -477,6 +501,9 @@
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
+ //recover general-purpose configuration
+ recoverGeneral(txn, registry);
+
txn.commit();
} catch (const DbException& e) {
@@ -613,6 +640,26 @@
}
}
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+{
+ Cursor items;
+ items.open(generalDb, txn.get());
+
+ u_int64_t maxGeneralId(1);
+ IdDbt key;
+ Dbt value;
+ //read all items
+ while (items.next(key, value)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ //create instance
+ RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
+ //set the persistenceId and update max as required
+ config->setPersistenceId(key.id);
+ maxGeneralId = max(key.id, maxGeneralId);
+ }
+ generalIdSequence.reset(maxGeneralId + 1);
+}
+
// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-05-12 17:23:00 UTC (rev 2037)
@@ -82,11 +82,13 @@
Db messageDb;
Db mappingDb;
Db bindingDb;
+ Db generalDb;
Db enqueueXidDb;
Db dequeueXidDb;
Db prepareXidDb;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
+ IdSequence generalIdSequence;
IdSequence messageIdSequence;
static bool useAsync;
std::string storeDir;
@@ -113,6 +115,7 @@
uint64_t mId, unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked, message_index& prepared);
void recoverXids(txn_list& txns);
@@ -194,6 +197,9 @@
const qpid::broker::PersistableQueue& queue,
const std::string& key, const qpid::framing::FieldTable& args);
+ void create(const qpid::broker::PersistableConfig& config);
+ void destroy(const qpid::broker::PersistableConfig& config);
+
void recover(qpid::broker::RecoveryManager& queues);
void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2008-05-12 17:23:00 UTC (rev 2037)
@@ -96,9 +96,10 @@
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
store->init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
+ LinkRegistry links(0);
DtxManager mgr;
mgr.setStore (store.get());
- RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);
+ RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, 0);
store->recover(recoveryMgr);
queue = queues.find(name);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-05-12 17:23:00 UTC (rev 2037)
@@ -60,24 +60,26 @@
}
};
-void recover(BdbMessageStore& store, QueueRegistry& queues, ExchangeRegistry& exchanges)
+void recover(BdbMessageStore& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
{
DtxManager mgr;
mgr.setStore (&store);
- RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
store.recover(recovery);
}
void recover(BdbMessageStore& store, ExchangeRegistry& exchanges)
{
QueueRegistry queues;
- recover(store, queues, exchanges);
+ LinkRegistry links(0);
+ recover(store, queues, exchanges, links);
}
void recover(BdbMessageStore& store, QueueRegistry& queues)
{
ExchangeRegistry exchanges;
- recover(store, queues, exchanges);
+ LinkRegistry links(0);
+ recover(store, queues, exchanges, links);
}
void testCreateDelete(bool async)
@@ -331,9 +333,10 @@
QueueRegistry registry;
registry.setStore (&store);
ExchangeRegistry exchanges;
+ LinkRegistry links(0);
DtxManager dtx;
dtx.setStore (&store);
- RecoveryManagerImpl recovery(registry, exchanges, dtx, 10);
+ RecoveryManagerImpl recovery(registry, exchanges, links, dtx, 10);
store.recover(recovery);
//get message instance from queue
@@ -487,8 +490,9 @@
store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
+ LinkRegistry links(0);
- recover(store, queues, exchanges);
+ recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
Queue::shared_ptr queue = queues.find(queueName);
@@ -501,8 +505,9 @@
store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
+ LinkRegistry links(0);
- recover(store, queues, exchanges);
+ recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
Queue::shared_ptr queue = queues.find(queueName);
@@ -551,9 +556,10 @@
store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
+ LinkRegistry links(0);
//ensure recovery works ok:
- recover(store, queues, exchanges);
+ recover(store, queues, exchanges, links);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
BOOST_REQUIRE(!queues.find(queueName1).get());
@@ -567,9 +573,10 @@
store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
+ LinkRegistry links(0);
//ensure recovery works ok:
- recover(store, queues, exchanges);
+ recover(store, queues, exchanges, links);
try {
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-12 17:23:00 UTC (rev 2037)
@@ -84,9 +84,10 @@
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
store->init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
+ LinkRegistry links(0);
DtxManager mgr;
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
store->recover(recovery);
queueA = queues.find(nameA);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-12 17:23:00 UTC (rev 2037)
@@ -128,6 +128,7 @@
std::auto_ptr<BdbMessageStore> store;
std::auto_ptr<DtxManager> dtxmgr;
QueueRegistry queues;
+ LinkRegistry links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
Message::shared_ptr msg1;
@@ -250,7 +251,7 @@
ExchangeRegistry exchanges;
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);
+ RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
store->recover(recovery);
queueA = queues.find(nameA);
@@ -292,7 +293,7 @@
}
public:
- TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
+ TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
void testCommitSwap(bool a)
{
More information about the rhmessaging-commits
mailing list