Author: tedross
Date: 2008-01-14 10:48:34 -0500 (Mon, 14 Jan 2008)
New Revision: 1563
Added:
store/trunk/cpp/lib/StorePlugin.cpp
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/system_test.sh
Log:
Modified the module interface between Bdbstore and Qpid to use Qpid's plug-in module
interface.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -50,8 +50,8 @@
qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(1000000); // 1ms
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(1000000000); // 1 sec
unsigned int TxnCtxt::count = 0;
+qpid::sys::Mutex TxnCtxt::globalSerialiser;
-
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
configDb(&env, 0),
@@ -68,8 +68,6 @@
envPath(envpath)
{
-
-
}
bool BdbMessageStore::init(const std::string& dir, const bool async, const bool
force, u_int16_t jfiles, u_int32_t jfileSizePgs)
@@ -122,7 +120,7 @@
bool BdbMessageStore::init(const qpid::Options* options)
{
- const qpid::broker::Broker::Options* opts = static_cast<const
qpid::broker::Broker::Options*>(options);
+ const Options* opts = static_cast<const Options*>(options);
u_int16_t numJrnlFiles = opts->numJrnlFiles;
if (numJrnlFiles < JRNL_MIN_NUM_FILES)
@@ -1296,16 +1294,6 @@
return txn;
}
-extern "C" MessageStore* create()
-{
- return new BdbMessageStore();
-}
-
-extern "C" void destroy(MessageStore* store)
-{
- delete store;
-}
-
void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
{
try {
@@ -1371,4 +1359,26 @@
return dir.str();
}
+BdbMessageStore::Options::Options(const std::string& name) :
+ qpid::Options(name),
+ storeDir("/var"),
+ storeAsync(false),
+ storeForce(false),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24)
+{
+ addOptions()
+ ("store-directory", qpid::optValue(storeDir,"DIR"),
+ "Store directory location for persistence.")
+ ("store-async", qpid::optValue(storeAsync,"yes|no"),
+ "Use async persistence storage - if store supports it, enables AIO
O_DIRECT.")
+ ("store-force", qpid::optValue(storeForce,"yes|no"),
+ "Force changing modes of store, will delete all existing data if mode is
changed. Be SURE you want to do this!")
+ ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
+ "Number of files in persistence journal")
+ ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+ "Size of each journal file in multiples of read pages (1 read page =
64kiB)")
+ ;
+}
+
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-01-14 15:48:34 UTC (rev 1563)
@@ -141,6 +141,16 @@
}
public:
+ struct Options : public qpid::Options {
+ Options(const std::string& name="Store Options");
+ string clusterName;
+ string storeDir;
+ bool storeAsync;
+ bool storeForce;
+ uint16_t numJrnlFiles;
+ uint32_t jrnlFsizePgs;
+ };
+
BdbMessageStore(const char* envpath = 0);
virtual ~BdbMessageStore();
bool init(const qpid::Options* options);
@@ -190,9 +200,6 @@
void abort(qpid::broker::TransactionContext& ctxt);
};
}
-
- extern "C" qpid::broker::MessageStore* create();
- extern "C" void destroy(qpid::broker::MessageStore* store);
}
#endif
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/Makefile.am 2008-01-14 15:48:34 UTC (rev 1563)
@@ -14,6 +14,7 @@
$(LIBTOOL_VERSION_INFO_ARG)
libbdbstore_la_SOURCES = \
+ StorePlugin.cpp \
BdbMessageStore.cpp \
BindingDbt.cpp \
BufferValue.cpp \
@@ -85,7 +86,10 @@
jrnl/txn_map.hpp \
jrnl/txn_rec.hpp \
jrnl/wmgr.hpp \
- jrnl/wrfc.hpp
+ jrnl/wrfc.hpp \
+ gen/Journal.cpp \
+ gen/Journal.h \
+ gen/ArgsJournalExpand.h
Added: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp (rev 0)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Broker.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "BdbMessageStore.h"
+
+
+namespace qpid {
+namespace broker {
+
+using namespace std;
+
+struct StorePlugin : public Plugin {
+
+ rhm::bdbstore::BdbMessageStore::Options options;
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize (Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ MessageStore *store = new rhm::bdbstore::BdbMessageStore ();
+
+ if (!store->init (&options))
+ {
+ throw Exception("Existing Journal in different mode, backup/move
existing data "
+ "before changing modes. Or use '--store-force
yes' to blow existing data away.");
+ }
+
+ broker->setStore (store);
+ }
+
+ void initialize(Plugin::Target&) {}
+};
+
+static StorePlugin instance; // Static initialization.
+
+}} // namespace qpid::broker
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-01-14 15:48:34 UTC (rev 1563)
@@ -163,8 +163,6 @@
}
};
-qpid::sys::Mutex TxnCtxt::globalSerialiser;
-
}}
#endif
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -150,7 +150,8 @@
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
store->init(TESTDIR, async, false, 4, 1);
ExchangeRegistry exchanges;
- DtxManager mgr(store.get());
+ DtxManager mgr;
+ mgr.setStore (store.get());
RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);
store->recover(recoveryMgr);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -105,7 +105,8 @@
void recover(BdbMessageStore& store, QueueRegistry& queues,
ExchangeRegistry& exchanges)
{
- DtxManager mgr(&store);
+ DtxManager mgr;
+ mgr.setStore (&store);
RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
store.recover(recovery);
}
@@ -120,7 +121,8 @@
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
//nothing to assert, just testing it doesn't blow up
}
@@ -162,7 +164,8 @@
{
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
CPPUNIT_ASSERT(queue.get());
@@ -190,7 +193,8 @@
{
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
CPPUNIT_ASSERT(queue);
@@ -216,7 +220,8 @@
{
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
CPPUNIT_ASSERT(!registry.find(name));
}
@@ -258,7 +263,8 @@
{
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
CPPUNIT_ASSERT(queue);
@@ -311,7 +317,8 @@
{
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
CPPUNIT_ASSERT(queue);
@@ -379,9 +386,11 @@
//recover
BdbMessageStore store;
store.init(TESTDIR, async, false, 4, 1);
- QueueRegistry registry(&store);
+ QueueRegistry registry;
+ registry.setStore (&store);
ExchangeRegistry exchanges;
- DtxManager dtx(&store);
+ DtxManager dtx;
+ dtx.setStore (&store);
RecoveryManagerImpl recovery(registry, exchanges, dtx, 10);
store.recover(recovery);
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -123,7 +123,8 @@
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
store->init(TESTDIR, async, false, 4, 1);
ExchangeRegistry exchanges;
- DtxManager mgr(store.get());
+ DtxManager mgr;
+ mgr.setStore (store.get());
RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
store->recover(recovery);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-01-14 15:48:34 UTC (rev 1563)
@@ -365,7 +365,8 @@
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
store->init(TESTDIR, async, false, 4, 1);
ExchangeRegistry exchanges;
- dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(store.get()));
+ dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
+ dtxmgr->setStore (store.get());
RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);
store->recover(recovery);
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-01-14 14:07:57 UTC (rev 1562)
+++ store/trunk/cpp/tests/system_test.sh 2008-01-14 15:48:34 UTC (rev 1563)
@@ -74,8 +74,8 @@
fi
for p in `seq 1 6`; do
log="$abs_srcdir/vg-log.$mode.$p"
- #echo "$vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS"
- $vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log"
2> $log & pid=$!
+ #echo "$vg $QPIDD --load $LIBBDBSTORE $JRNLFLAGS"
+ $vg $QPIDD --load $LIBBDBSTORE $JRNLFLAGS >>
"$abs_srcdir/qpid.log" 2> $log & pid=$!
sleep 5
echo phase $p...