[rhmessaging-commits] rhmessaging commits: r1563 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jan 14 10:48:35 EST 2008


Author: tedross
Date: 2008-01-14 10:48:34 -0500 (Mon, 14 Jan 2008)
New Revision: 1563

Added:
   store/trunk/cpp/lib/StorePlugin.cpp
Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/tests/OrderingTest.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
   store/trunk/cpp/tests/system_test.sh
Log:
Modified the module interface between Bdbstore and Qpid to use Qpid's plug-in module interface.


Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -50,8 +50,8 @@
 qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(1000000); // 1ms
 qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(1000000000); // 1 sec
 unsigned int TxnCtxt::count = 0;
+qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
-
 BdbMessageStore::BdbMessageStore(const char* envpath) : env(0), 
                                                         queueDb(&env, 0), 
                                                         configDb(&env, 0), 
@@ -68,8 +68,6 @@
                                                         envPath(envpath)
 
 {
-
-
 }
 
 bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs) 
@@ -122,7 +120,7 @@
 
 bool BdbMessageStore::init(const qpid::Options* options) 
 {
-    const qpid::broker::Broker::Options* opts = static_cast<const qpid::broker::Broker::Options*>(options);
+    const Options* opts = static_cast<const Options*>(options);
     
     u_int16_t numJrnlFiles = opts->numJrnlFiles;
     if (numJrnlFiles < JRNL_MIN_NUM_FILES)
@@ -1296,16 +1294,6 @@
     return txn;
 }
 
-extern "C" MessageStore* create()
-{
-    return new BdbMessageStore();
-}
-
-extern "C" void destroy(MessageStore* store)
-{
-    delete store;
-}
-
 void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
 {
     try {
@@ -1371,4 +1359,26 @@
     return dir.str();
 }
 
+BdbMessageStore::Options::Options(const std::string& name) :
+    qpid::Options(name),
+    storeDir("/var"),
+    storeAsync(false),
+    storeForce(false),
+    numJrnlFiles(8),
+    jrnlFsizePgs(24)
+{
+    addOptions()
+        ("store-directory", qpid::optValue(storeDir,"DIR"),
+         "Store directory location for persistence.")
+        ("store-async", qpid::optValue(storeAsync,"yes|no"),
+         "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
+        ("store-force", qpid::optValue(storeForce,"yes|no"),
+         "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
+        ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
+         "Number of files in persistence journal")
+        ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+         "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
+        ;
+}
 
+

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-01-14 15:48:34 UTC (rev 1563)
@@ -141,6 +141,16 @@
             }
 
         public:
+            struct Options : public qpid::Options {
+                Options(const std::string& name="Store Options");
+                string clusterName;
+                string storeDir;
+                bool storeAsync;
+                bool storeForce;
+                uint16_t numJrnlFiles;
+                uint32_t jrnlFsizePgs;
+            };
+
             BdbMessageStore(const char* envpath = 0);
             virtual ~BdbMessageStore();
 			bool init(const qpid::Options* options);
@@ -190,9 +200,6 @@
             void abort(qpid::broker::TransactionContext& ctxt);
         };
     }
-
-    extern "C" qpid::broker::MessageStore* create();
-    extern "C" void destroy(qpid::broker::MessageStore* store);
 }
 
 #endif

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/Makefile.am	2008-01-14 15:48:34 UTC (rev 1563)
@@ -14,6 +14,7 @@
   $(LIBTOOL_VERSION_INFO_ARG)
 
 libbdbstore_la_SOURCES =			\
+  StorePlugin.cpp				\
   BdbMessageStore.cpp				\
   BindingDbt.cpp				\
   BufferValue.cpp				\
@@ -85,7 +86,10 @@
   jrnl/txn_map.hpp			\
   jrnl/txn_rec.hpp			\
   jrnl/wmgr.hpp			\
-  jrnl/wrfc.hpp
+  jrnl/wrfc.hpp			\
+  gen/Journal.cpp		\
+  gen/Journal.h			\
+  gen/ArgsJournalExpand.h
 
 
 

Added: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/StorePlugin.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "BdbMessageStore.h"
+
+
+namespace qpid {
+namespace broker {
+
+using namespace std;
+
+struct StorePlugin : public Plugin {
+
+    rhm::bdbstore::BdbMessageStore::Options options;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize (Plugin::Target& target)
+    {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        MessageStore *store = new rhm::bdbstore::BdbMessageStore ();
+
+        if (!store->init (&options))
+        {
+            throw Exception("Existing Journal in different mode, backup/move existing data "
+                            "before changing modes. Or use '--store-force yes' to blow existing data away.");
+        }
+
+        broker->setStore (store);
+    }
+
+    void initialize(Plugin::Target&) {}
+};
+
+static StorePlugin instance; // Static initialization.
+    
+}} // namespace qpid::broker

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/TxnCtxt.h	2008-01-14 15:48:34 UTC (rev 1563)
@@ -163,8 +163,6 @@
     } 
 };
 
-qpid::sys::Mutex TxnCtxt::globalSerialiser;
-
 }}
 
 #endif

Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/OrderingTest.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -150,7 +150,8 @@
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
         store->init(TESTDIR, async, false, 4, 1);
         ExchangeRegistry exchanges;
-        DtxManager mgr(store.get());
+        DtxManager mgr;
+        mgr.setStore (store.get());
         RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);
         store->recover(recoveryMgr);
 

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -105,7 +105,8 @@
 
     void recover(BdbMessageStore& store, QueueRegistry& queues, ExchangeRegistry& exchanges)
     {
-        DtxManager mgr(&store);
+        DtxManager mgr;
+        mgr.setStore (&store);
         RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
         store.recover(recovery);
     }
@@ -120,7 +121,8 @@
         BdbMessageStore store;
         store.init(TESTDIR, async, false, 4, 1);
         store.truncate();//make sure it is empty to begin with
-        QueueRegistry registry(&store);
+        QueueRegistry registry;
+        registry.setStore (&store);
         recover(store, registry);
         //nothing to assert, just testing it doesn't blow up
     }
@@ -162,7 +164,8 @@
         {
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
             CPPUNIT_ASSERT(queue.get());
@@ -190,7 +193,8 @@
         {
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
             CPPUNIT_ASSERT(queue);
@@ -216,7 +220,8 @@
         {
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             recover(store, registry);
             CPPUNIT_ASSERT(!registry.find(name));
         }
@@ -258,7 +263,8 @@
         {
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
             CPPUNIT_ASSERT(queue);
@@ -311,7 +317,8 @@
         {
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
             CPPUNIT_ASSERT(queue);
@@ -379,9 +386,11 @@
             //recover
             BdbMessageStore store;
             store.init(TESTDIR, async, false, 4, 1);
-            QueueRegistry registry(&store);
+            QueueRegistry registry;
+            registry.setStore (&store);
             ExchangeRegistry exchanges;
-            DtxManager dtx(&store);
+            DtxManager dtx;
+            dtx.setStore (&store);
             RecoveryManagerImpl recovery(registry, exchanges, dtx, 10);
             store.recover(recovery);
 

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -123,7 +123,8 @@
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
         store->init(TESTDIR, async, false, 4, 1);
         ExchangeRegistry exchanges;
-        DtxManager mgr(store.get());
+        DtxManager mgr;
+        mgr.setStore (store.get());
         RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
         store->recover(recovery);
  

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-01-14 15:48:34 UTC (rev 1563)
@@ -365,7 +365,8 @@
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
         store->init(TESTDIR, async, false, 4, 1);
         ExchangeRegistry exchanges;
-        dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(store.get()));
+        dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
+        dtxmgr->setStore (store.get());
         RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);
         store->recover(recovery);
 

Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh	2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/system_test.sh	2008-01-14 15:48:34 UTC (rev 1563)
@@ -74,8 +74,8 @@
     fi
     for p in `seq 1 6`; do
         log="$abs_srcdir/vg-log.$mode.$p"
-        #echo "$vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS"
-        $vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log" 2> $log & pid=$!
+        #echo "$vg $QPIDD --load $LIBBDBSTORE $JRNLFLAGS"
+        $vg $QPIDD --load $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log" 2> $log & pid=$!
         sleep 5
 
         echo phase $p...




More information about the rhmessaging-commits mailing list