[rhmessaging-commits] rhmessaging commits: r2037 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon May 12 13:23:00 EDT 2008


Author: tedross
Date: 2008-05-12 13:23:00 -0400 (Mon, 12 May 2008)
New Revision: 2037

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/tests/OrderingTest.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Added store support for general configuration items (used for qpid federation)

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-05-12 17:23:00 UTC (rev 2037)
@@ -64,6 +64,7 @@
                                                         messageDb(&env, 0), 
                                                         mappingDb(&env, 0), 
                                                         bindingDb(&env, 0), 
+                                                        generalDb(&env, 0),
                                                         enqueueXidDb(&env, 0), 
                                                         dequeueXidDb(&env, 0), 
                                                         prepareXidDb(&env, 0),
@@ -149,6 +150,7 @@
         open(messageDb, txn.get(), "messages.db", false);       
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
+        open(generalDb, txn.get(), "general.db",  false);
         open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
         open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
         open(prepareXidDb, txn.get(), "prepare_xid.db", false);
@@ -416,6 +418,28 @@
     bindingDb.del(0, &key, DB_AUTO_COMMIT);
 }
 
+void BdbMessageStore::create(const PersistableConfig& general)
+{
+    checkInit();
+    if (general.getPersistenceId()) {
+        THROW_STORE_EXCEPTION("General configuration item already created");
+    }
+
+    try {
+        if (!create(generalDb, generalIdSequence, general)) {
+            THROW_STORE_EXCEPTION("General configuration already exists");
+        }
+    } catch (const DbException& e) {
+        THROW_STORE_EXCEPTION_2("Error creating general configuration", e);
+    }
+}
+
+void BdbMessageStore::destroy(const PersistableConfig& general)
+{
+    checkInit();
+    destroy(generalDb, general);
+}
+
 bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
 {
     u_int64_t id (seq.next());
@@ -477,6 +501,9 @@
         recoverExchanges(txn, registry, exchanges);
         recoverBindings(txn, exchanges, queues);
 
+        //recover general-purpose configuration
+        recoverGeneral(txn, registry);
+
         txn.commit();
 
     } catch (const DbException& e) {
@@ -613,6 +640,26 @@
     }
 }
 
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+{
+    Cursor items;
+    items.open(generalDb, txn.get());
+
+    u_int64_t maxGeneralId(1);
+    IdDbt key;
+    Dbt value;
+    //read all items
+    while (items.next(key, value)) {
+        Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+        //create instance
+        RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
+        //set the persistenceId and update max as required
+        config->setPersistenceId(key.id);
+        maxGeneralId = max(key.id, maxGeneralId);
+    }
+    generalIdSequence.reset(maxGeneralId + 1);
+}
+
 // async IO version.
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
                                       qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-05-12 17:23:00 UTC (rev 2037)
@@ -82,11 +82,13 @@
             Db messageDb;
             Db mappingDb;
             Db bindingDb;
+            Db generalDb;
             Db enqueueXidDb;
             Db dequeueXidDb;
             Db prepareXidDb;
             IdSequence queueIdSequence;
             IdSequence exchangeIdSequence;
+            IdSequence generalIdSequence;
             IdSequence messageIdSequence;
 			static bool useAsync;
 			std::string storeDir;
@@ -113,6 +115,7 @@
 			        uint64_t mId, unsigned& headerSize);
             void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
             void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
+            void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
             int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, 
                     queue_index& index, txn_list& locked, message_index& prepared);
             void recoverXids(txn_list& txns);
@@ -194,6 +197,9 @@
                     const qpid::broker::PersistableQueue& queue, 
                     const std::string& key, const qpid::framing::FieldTable& args);
 
+            void create(const qpid::broker::PersistableConfig& config);
+            void destroy(const qpid::broker::PersistableConfig& config);
+
             void recover(qpid::broker::RecoveryManager& queues);
 
             void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);

Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/OrderingTest.cpp	2008-05-12 17:23:00 UTC (rev 2037)
@@ -96,9 +96,10 @@
     store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
     store->init(TESTDIR, async, false, 4, 1, 8);
     ExchangeRegistry exchanges;
+    LinkRegistry links(0);
     DtxManager mgr;
     mgr.setStore (store.get());
-    RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);
+    RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, 0);
     store->recover(recoveryMgr);
 
     queue = queues.find(name);

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2008-05-12 17:23:00 UTC (rev 2037)
@@ -60,24 +60,26 @@
     }
 };
 
-void recover(BdbMessageStore& store, QueueRegistry& queues, ExchangeRegistry& exchanges)
+void recover(BdbMessageStore& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
 {
     DtxManager mgr;
     mgr.setStore (&store);
-    RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
+    RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
     store.recover(recovery);
 }
 
 void recover(BdbMessageStore& store, ExchangeRegistry& exchanges)
 {
     QueueRegistry queues;
-    recover(store, queues, exchanges);
+    LinkRegistry links(0);
+    recover(store, queues, exchanges, links);
 }
 
 void recover(BdbMessageStore& store, QueueRegistry& queues)
 {
     ExchangeRegistry exchanges;
-    recover(store, queues, exchanges);
+    LinkRegistry links(0);
+    recover(store, queues, exchanges, links);
 }
 
 void testCreateDelete(bool async)
@@ -331,9 +333,10 @@
         QueueRegistry registry;
         registry.setStore (&store);
         ExchangeRegistry exchanges;
+        LinkRegistry links(0);
         DtxManager dtx;
         dtx.setStore (&store);
-        RecoveryManagerImpl recovery(registry, exchanges, dtx, 10);
+        RecoveryManagerImpl recovery(registry, exchanges, links, dtx, 10);
         store.recover(recovery);
 
         //get message instance from queue
@@ -487,8 +490,9 @@
         store.init(TESTDIR, async, false, 4, 1, 8);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
+        LinkRegistry links(0);
 
-        recover(store, queues, exchanges);
+        recover(store, queues, exchanges, links);
 
         Exchange::shared_ptr exchange = exchanges.get(exchangeName);
         Queue::shared_ptr queue = queues.find(queueName);
@@ -501,8 +505,9 @@
         store.init(TESTDIR, async, false, 4, 1, 8);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
+        LinkRegistry links(0);
 
-        recover(store, queues, exchanges);
+        recover(store, queues, exchanges, links);
 
         Exchange::shared_ptr exchange = exchanges.get(exchangeName);
         Queue::shared_ptr queue = queues.find(queueName);
@@ -551,9 +556,10 @@
         store.init(TESTDIR, async, false, 4, 1, 8);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
+        LinkRegistry links(0);
 
         //ensure recovery works ok:
-        recover(store, queues, exchanges);
+        recover(store, queues, exchanges, links);
 
         Exchange::shared_ptr exchange = exchanges.get(exchangeName);
         BOOST_REQUIRE(!queues.find(queueName1).get());
@@ -567,9 +573,10 @@
         store.init(TESTDIR, async, false, 4, 1, 8);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
+        LinkRegistry links(0);
 
         //ensure recovery works ok:            
-        recover(store, queues, exchanges);
+        recover(store, queues, exchanges, links);
 
         try {
             Exchange::shared_ptr exchange = exchanges.get(exchangeName);

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2008-05-12 17:23:00 UTC (rev 2037)
@@ -84,9 +84,10 @@
     store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
     store->init(TESTDIR, async, false, 4, 1, 8);
     ExchangeRegistry exchanges;
+    LinkRegistry links(0);
     DtxManager mgr;
     mgr.setStore (store.get());
-    RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
+    RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
     store->recover(recovery);
 
     queueA = queues.find(nameA);

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-05-12 16:32:14 UTC (rev 2036)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-05-12 17:23:00 UTC (rev 2037)
@@ -128,6 +128,7 @@
     std::auto_ptr<BdbMessageStore> store;
     std::auto_ptr<DtxManager> dtxmgr;
     QueueRegistry queues;
+    LinkRegistry links;
     Queue::shared_ptr queueA;
     Queue::shared_ptr queueB;
     Message::shared_ptr msg1;
@@ -250,7 +251,7 @@
         ExchangeRegistry exchanges;
         dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
         dtxmgr->setStore (store.get());
-        RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);
+        RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
         store->recover(recovery);
 
         queueA = queues.find(nameA);
@@ -292,7 +293,7 @@
     }
 
 public:
-    TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
+    TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
 
     void testCommitSwap(bool a)
     {




More information about the rhmessaging-commits mailing list