rhmessaging commits: r2191 - in store/branches/mrg-1.0/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 08:43:10 -0400 (Mon, 14 Jul 2008)
New Revision: 2191
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/tests/OrderingTest.cpp
store/branches/mrg-1.0/cpp/tests/SimpleTest.cpp
store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
store/branches/mrg-1.0/cpp/tests/python_tests/flow_to_disk.py
Log:
Backport of trunk r.2170: BDB cleanup - phase 1. Removed all sync tests, refactored tests (slightly) to improve layout without need for sync/async; removed async and force params from BdbMessageStore::init(...). Next phase will clean up the BdbMessageStore class itself.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 12:43:10 UTC (rev 2191)
@@ -97,7 +97,7 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
+bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
{
if (isInit) return true;
@@ -125,7 +125,8 @@
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
- useAsync = async;
+ // TODO: remove
+ useAsync = true;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
@@ -165,11 +166,14 @@
txn.abort();
throw;
}
+
+ // TODO: remove
+ bool force = false;
ret = mode(useAsync, force);
if (!ret) return false;
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; async=" << (async?"T":"F") << "; force=" << (force?"T":"F") << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
+ QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
}
@@ -242,7 +246,7 @@
}
}
- return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
+ return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
// true is async
@@ -1598,8 +1602,6 @@
BdbMessageStore::Options::Options(const std::string& name) :
qpid::Options(name),
- storeAsync(true),
- storeForce(false),
numJrnlFiles(8),
jrnlFsizePgs(24),
wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 12:43:10 UTC (rev 2191)
@@ -68,8 +68,6 @@
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Default store settings
- static const bool defUseAsync = false;
- static const bool defForceStoreConversion = false;
static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
@@ -157,7 +155,7 @@
static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
- if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
public:
@@ -177,7 +175,7 @@
BdbMessageStore(const char* envpath = 0);
virtual ~BdbMessageStore();
bool init(const qpid::Options* options);
- bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+ bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
void initManagement (qpid::broker::Broker* broker);
void truncate();
Modified: store/branches/mrg-1.0/cpp/tests/OrderingTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/OrderingTest.cpp 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/tests/OrderingTest.cpp 2008-07-14 12:43:10 UTC (rev 2191)
@@ -53,10 +53,10 @@
std::queue<Uuid> ids;
int counter = 1;
-void setup(bool async)
+void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -88,13 +88,13 @@
}
}
-void restart(bool async)
+void restart()
{
queue.reset();
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
@@ -114,57 +114,33 @@
BOOST_CHECK_EQUAL((size_t) 0, ids.size());
}
-void testBasic(bool async = false)
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Basic)
{
- setup(async);
+ std::cout << test_filename << ".Basic: " << std::flush;
+ setup();
//push on 10 messages
for (int i = 0; i < 10; i++) push();
- restart(async);
+ restart();
check();
+ std::cout << "ok" << std::endl;
}
-void testCycle(bool async = false)
+QPID_AUTO_TEST_CASE(Cycle)
{
- setup(async);
+ std::cout << test_filename << ".Cycle: " << std::flush;
+ setup();
//push on 10 messages:
for (int i = 0; i < 10; i++) push();
//pop 5:
for (int i = 0; i < 5; i++) pop();
//push on another 5:
for (int i = 0; i < 5; i++) push();
- restart(async);
+ restart();
check();
-}
-
-
-// === Test suite ===
-
-QPID_AUTO_TEST_CASE(BasicSync)
-{
- std::cout << test_filename << ".BasicSync: " << std::flush;
- testBasic(false);
std::cout << "ok" << std::endl;
}
-QPID_AUTO_TEST_CASE(BasicAsync)
-{
- std::cout << test_filename << ".BasicAsync: " << std::flush;
- testBasic(true);
- std::cout << "ok" << std::endl;
-}
-
-QPID_AUTO_TEST_CASE(CycleSync)
-{
- std::cout << test_filename << ".CycleSync: " << std::flush;
- testCycle(false);
- std::cout << "ok" << std::endl;
-}
-
-QPID_AUTO_TEST_CASE(CycleAsync)
-{
- std::cout << test_filename << ".CycleAsync: " << std::flush;
- testCycle(true);
- std::cout << "ok" << std::endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/SimpleTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/SimpleTest.cpp 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/tests/SimpleTest.cpp 2008-07-14 12:43:10 UTC (rev 2191)
@@ -82,10 +82,60 @@
recover(store, queues, exchanges, links);
}
-void testCreateDelete(bool async)
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args)
{
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ store.truncate();//make sure it is empty to begin with
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(exchange->bind(queue, key, &args));
+ store.bind(*exchange, *queue, key, args);
+ }//db will be closed
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links(0);
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ //check it is bound by unbinding
+ BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+ store.unbind(*exchange, *queue, key, args);
+ }
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links(0);
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ //make sure it is no longer bound
+ BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+ }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+ cout << test_filename << ".CreateDelete: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -95,26 +145,33 @@
store.destroy(queue);
// TODO - check dir is deleted
+ cout << "ok" << endl;
}
-void testEmptyRecover(bool async)
+QPID_AUTO_TEST_CASE(EmptyRecover)
{
+ cout << test_filename << ".EmptyRecover: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
//nothing to assert, just testing it doesn't blow up
+
+ cout << "ok" << endl;
}
-void testQueueCreate(bool async)
+QPID_AUTO_TEST_CASE(QueueCreate)
{
+ cout << test_filename << ".QueueCreate: " << flush;
+
uint64_t id(0);
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -123,7 +180,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -131,15 +188,19 @@
BOOST_REQUIRE(queue.get());
BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
}
+
+ cout << "ok" << endl;
}
-void testQueueCreateWithSettings(bool async)
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
{
+ cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
QueuePolicy policy(101, 202);
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -149,7 +210,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -159,14 +220,18 @@
BOOST_CHECK_EQUAL(policy.getMaxCount(), queue->getPolicy()->getMaxCount());
BOOST_CHECK_EQUAL(policy.getMaxSize(), queue->getPolicy()->getMaxSize());
}
+
+ cout << "ok" << endl;
}
-void testQueueDestroy(bool async)
+QPID_AUTO_TEST_CASE(QueueDestroy)
{
+ cout << test_filename << ".QueueDestroy: " << flush;
+
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -174,16 +239,20 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
BOOST_REQUIRE(!registry.find(name));
}
+
+ cout << "ok" << endl;
}
-void testEnqueue(bool async)
+QPID_AUTO_TEST_CASE(Enqueue)
{
+ cout << test_filename << ".Enqueue: " << flush;
+
//TODO: this is largely copy & paste'd from MessageTest in
//qpid tree. ideally need some helper routines for reducing
//this to a simpler less duplicated form
@@ -196,7 +265,7 @@
string data2("hijklmn");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -215,7 +284,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -240,10 +309,14 @@
BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
}
+
+ cout << "ok" << endl;
}
-void testDequeue(bool async)
+QPID_AUTO_TEST_CASE(Dequeue)
{
+ cout << test_filename << ".Dequeue: " << flush;
+
//TODO: reduce the duplication in these tests
string name("MyDurableQueue");
{
@@ -252,7 +325,7 @@
Uuid messageId(true);
string data("abcdefg");
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -267,7 +340,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -275,10 +348,14 @@
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
}
+
+ cout << "ok" << endl;
}
-void testStaging(bool async)
+QPID_AUTO_TEST_CASE(Staging)
{
+ cout << test_filename << ".Staging: " << flush;
+
const string name("MyDurableQueue");
const string exchange("MyExchange");
const string routingKey("MyRoutingKey");
@@ -287,7 +364,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
//create & stage a message
@@ -329,7 +406,7 @@
{
//recover
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
ExchangeRegistry exchanges;
@@ -372,12 +449,16 @@
//dequeue
queue->dequeue(0, msg);
}
+
+ cout << "ok" << endl;
}
-void testDestroyStagedMessage(bool async)
+QPID_AUTO_TEST_CASE(DestroyStagedMessage)
{
+ cout << test_filename << ".DestroyStagedMessage: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -396,12 +477,16 @@
BOOST_FAIL("store.loadContent() did not throw StoreException as expected.");
} catch (StoreException& e) {
}
+
+ cout << "ok" << endl;
}
-void testDestroyEnqueuedMessage(bool async)
+QPID_AUTO_TEST_CASE(DestroyEnqueuedMessage)
{
+ cout << test_filename << ".DestroyEnqueuedMessage: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -422,10 +507,14 @@
store.dequeue(0, pmsg, queue);
store.destroy(queue);
+
+ cout << "ok" << endl;
}
-void testExchangeCreateAndDestroy(bool async)
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
{
+ cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
uint64_t id(0);
string name("MyDurableExchange");
string type("direct");
@@ -433,7 +522,7 @@
args.setString("a", "A");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -443,7 +532,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -457,7 +546,7 @@
}
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -469,68 +558,35 @@
BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
}
}
+
+ cout << "ok" << endl;
}
-void bindAndUnbind(const string& exchangeName, const string& queueName,
- const string& key, const FieldTable& args, bool async)
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
{
- {
- BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
- Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
- Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
- store.create(*exchange, qpid::framing::FieldTable());
- store.create(*queue, qpid::framing::FieldTable());
- BOOST_REQUIRE(exchange->bind(queue, key, &args));
- store.bind(*exchange, *queue, key, args);
- }//db will be closed
- {
- BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
- ExchangeRegistry exchanges;
- QueueRegistry queues;
- LinkRegistry links(0);
+ cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
- recover(store, queues, exchanges, links);
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
- Exchange::shared_ptr exchange = exchanges.get(exchangeName);
- Queue::shared_ptr queue = queues.find(queueName);
- //check it is bound by unbinding
- BOOST_REQUIRE(exchange->unbind(queue, key, &args));
- store.unbind(*exchange, *queue, key, args);
- }
- {
- BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
- ExchangeRegistry exchanges;
- QueueRegistry queues;
- LinkRegistry links(0);
-
- recover(store, queues, exchanges, links);
-
- Exchange::shared_ptr exchange = exchanges.get(exchangeName);
- Queue::shared_ptr queue = queues.find(queueName);
- //make sure it is no longer bound
- BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
- }
+ cout << "ok" << endl;
}
-void testExchangeBindAndUnbind(bool async)
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
{
- bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable(), async);
-}
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
-void testExchangeBindAndUnbindWithArgs(bool async)
-{
FieldTable args;
args.setString("a", "A");
args.setString("b", "B");
- bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args, async);
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+ cout << "ok" << endl;
}
-void testExchangeImplicitUnbind(bool async)
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
{
+ cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
string exchangeName("MyDurableExchange");
string queueName1("MyDurableQueue1");
string queueName2("MyDurableQueue2");
@@ -538,7 +594,7 @@
FieldTable args;
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -553,7 +609,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links(0);
@@ -570,7 +626,7 @@
}
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links(0);
@@ -587,205 +643,8 @@
Queue::shared_ptr queue = queues.find(queueName2);
store.destroy(*queue);
}
-}
-
-// === Test suite ===
-
-QPID_AUTO_TEST_CASE(CreateDeleteSync)
-{
- cout << test_filename << ".CreateDeleteSync: " << flush;
- testCreateDelete(false);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CreateDeleteAsync)
-{
- cout << test_filename << ".CreateDeleteAsync: " << flush;
- testCreateDelete(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EmptyRecoverSync)
-{
- cout << test_filename << ".EmptyRecoverSync: " << flush;
- testEmptyRecover(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EmptyRecoverAsync)
-{
- cout << test_filename << ".EmptyRecoverAsync: " << flush;
- testEmptyRecover(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateSync)
-{
- cout << test_filename << ".QueueCreateSync: " << flush;
- testQueueCreate(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateAsync)
-{
- cout << test_filename << ".QueueCreateAsync: " << flush;
- testQueueCreate(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateWithSettingsSync)
-{
- cout << test_filename << ".QueueCreateWithSettingsSync: " << flush;
- testQueueCreateWithSettings(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateWithSettingsAsync)
-{
- cout << test_filename << ".QueueCreateWithSettingsAsync: " << flush;
- testQueueCreateWithSettings(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueDestroySync)
-{
- cout << test_filename << ".QueueDestroySync: " << flush;
- testQueueDestroy(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueDestroyAsync)
-{
- cout << test_filename << ".QueueDestroyAsync: " << flush;
- testQueueDestroy(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EnqueueSync)
-{
- cout << test_filename << ".EnqueueSync: " << flush;
- testEnqueue(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EnqueueAsync)
-{
- cout << test_filename << ".EnqueueAsync: " << flush;
- testEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DequeueSync)
-{
- cout << test_filename << ".DequeueSync: " << flush;
- testDequeue(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DequeueAsync)
-{
- cout << test_filename << ".DequeueAsync: " << flush;
- testDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(StagingSync)
-{
- cout << test_filename << ".StagingSync: " << flush;
- testStaging(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(StagingAsync)
-{
- cout << test_filename << ".StagingAsync: " << flush;
- testStaging(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyStagedMessageSync)
-{
- cout << test_filename << ".DestroyStagedMessageSync: " << flush;
- testDestroyStagedMessage(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyStagedMessageAsync)
-{
- cout << test_filename << ".DestroyStagedMessageAsync: " << flush;
- testDestroyStagedMessage(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyEnqueuedMessageSync)
-{
- cout << test_filename << ".DestroyEnqueuedMessageSync: " << flush;
- testDestroyEnqueuedMessage(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyEnqueuedMessageAsync)
-{
- cout << test_filename << ".DestroyEnqueuedMessageAsync: " << flush;
- testDestroyEnqueuedMessage(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroySync)
-{
- cout << test_filename << ".ExchangeCreateAndDestroySync: " << flush;
- testExchangeCreateAndDestroy(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroyAsync)
-{
- cout << test_filename << ".ExchangeCreateAndDestroyAsync: " << flush;
- testExchangeCreateAndDestroy(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindSync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindSync: " << flush;
- testExchangeBindAndUnbind(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindAsync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindAsync: " << flush;
- testExchangeBindAndUnbind(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsSync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindWithArgsSync: " << flush;
- testExchangeBindAndUnbindWithArgs(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsAsync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindWithArgsAsync: " << flush;
- testExchangeBindAndUnbindWithArgs(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindSync)
-{
- cout << test_filename << ".ExchangeImplicitUnbindSync: " << flush;
- testExchangeImplicitUnbind(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindAsync)
-{
- cout << test_filename << ".ExchangeImplicitUnbindAsync: " << flush;
- testExchangeImplicitUnbind(true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 12:43:10 UTC (rev 2191)
@@ -55,10 +55,10 @@
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
-void setup(bool async)
+void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
@@ -75,14 +75,14 @@
queueA->deliver(msg);
}
-void restart(bool async)
+void restart()
{
queueA.reset();
queueB.reset();
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
@@ -117,9 +117,9 @@
BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
}
-void swap(bool commit, bool async)
+void swap(bool commit)
{
- setup(async);
+ setup();
boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
BOOST_REQUIRE(msg);
@@ -133,39 +133,25 @@
store->abort(*txn);
}
- restart(async);
+ restart();
check(commit);
}
// === Test suite ===
-QPID_AUTO_TEST_CASE(CommitSync)
+QPID_AUTO_TEST_CASE(Commit)
{
- cout << test_filename << ".CommitSync: " << flush;
- swap(true, false);
+ cout << test_filename << ".Commit: " << flush;
+ swap(true);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitAsync)
+QPID_AUTO_TEST_CASE(Abort)
{
- cout << test_filename << ".CommitAsync: " << flush;
- swap(true, true);
+ cout << test_filename << ".Abort: " << flush;
+ swap(false);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortSync)
-{
- cout << test_filename << ".AbortSync: " << flush;
- swap(false, false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortAsync)
-{
- cout << test_filename << ".AbortAsync: " << flush;
- swap(false, true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 12:43:10 UTC (rev 2191)
@@ -134,7 +134,6 @@
boost::intrusive_ptr<Message> msg1;
boost::intrusive_ptr<Message> msg2;
boost::intrusive_ptr<Message> msg4;
- bool async;
void recoverPrepared(bool commit)
{
@@ -221,7 +220,7 @@
void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
@@ -247,7 +246,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
@@ -295,78 +294,67 @@
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
- void testCommitSwap(bool a)
+ void testCommitSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
commit(swap);
}
- void testPrepareAndAbortSwap(bool a)
+ void testPrepareAndAbortSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
abort(swap, true);
}
- void testAbortNoPrepareSwap(bool a)
+ void testAbortNoPrepareSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
abort(swap, false);
}
- void testCommitEnqueue(bool a)
+ void testCommitEnqueue()
{
- async = a;
Enqueue enqueue(this);
commit(enqueue);
}
- void testPrepareAndAbortEnqueue(bool a)
+ void testPrepareAndAbortEnqueue()
{
- async = a;
Enqueue enqueue(this);
abort(enqueue, true);
}
- void testAbortNoPrepareEnqueue(bool a)
+ void testAbortNoPrepareEnqueue()
{
- async = a;
Enqueue enqueue(this);
abort(enqueue, false);
}
- void testCommitDequeue(bool a)
+ void testCommitDequeue()
{
- async = a;
Dequeue dequeue(this);
commit(dequeue);
}
- void testPrepareAndAbortDequeue(bool a)
+ void testPrepareAndAbortDequeue()
{
- async = a;
Dequeue dequeue(this);
abort(dequeue, true);
}
- void testAbortNoPrepareDequeue(bool a)
+ void testAbortNoPrepareDequeue()
{
- async = a;
Dequeue dequeue(this);
abort(dequeue, false);
}
- void testRecoverPreparedThenCommitted(bool a)
+ void testRecoverPreparedThenCommitted()
{
- async = a;
recoverPrepared(true);
}
- void testRecoverPreparedThenAborted(bool a)
+ void testRecoverPreparedThenAborted()
{
- async = a;
recoverPrepared(false);
}
};
@@ -375,158 +363,81 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(CommitSwapSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
{
- cout << test_filename << ".CommitSwapSync: " << flush;
- tpct.testCommitSwap(false);
+ cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+ tpct.testPrepareAndAbortSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwapSync)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
{
- cout << test_filename << ".PrepareAndAbortSwapSync: " << flush;
- tpct.testPrepareAndAbortSwap(false);
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwapSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
{
- cout << test_filename << ".AbortNoPrepareSwapSync: " << flush;
- tpct.testAbortNoPrepareSwap(false);
+ cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+ tpct.testAbortNoPrepareEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitEnqueueSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
{
- cout << test_filename << ".CommitEnqueueSync: " << flush;
- tpct.testCommitEnqueue(false);
+ cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+ tpct.testPrepareAndAbortDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueueSync)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
{
- cout << test_filename << ".PrepareAndAbortEnqueueSync: " << flush;
- tpct.testPrepareAndAbortEnqueue(false);
+ cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+ tpct.testRecoverPreparedThenCommitted();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueueSync)
+QPID_AUTO_TEST_CASE(CommitSwap)
{
- cout << test_filename << ".AbortNoPrepareEnqueueSync: " << flush;
- tpct.testAbortNoPrepareEnqueue(false);
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitDequeueSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
{
- cout << test_filename << ".CommitDequeueSync: " << flush;
- tpct.testCommitDequeue(false);
+ cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+ tpct.testAbortNoPrepareSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeueSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
{
- cout << test_filename << ".PrepareAndAbortDequeueSync: " << flush;
- tpct.testPrepareAndAbortDequeue(false);
+ cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+ tpct.testPrepareAndAbortEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeueSync)
+QPID_AUTO_TEST_CASE(CommitDequeue)
{
- cout << test_filename << ".AbortNoPrepareDequeueSync: " << flush;
- tpct.testAbortNoPrepareDequeue(false);
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommittedSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
{
- cout << test_filename << ".RecoverPreparedThenCommittedSync: " << flush;
- tpct.testRecoverPreparedThenCommitted(false);
+ cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+ tpct.testAbortNoPrepareDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenAbortedSync)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted)
{
- cout << test_filename << ".RecoverPreparedThenAbortedSync: " << flush;
- tpct.testRecoverPreparedThenAborted(false);
+ cout << test_filename << ".RecoverPreparedThenAborted: " << flush;
+ tpct.testRecoverPreparedThenAborted();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitSwapAsync)
-{
- cout << test_filename << ".CommitSwapAsync: " << flush;
- tpct.testCommitSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwapAsync)
-{
- cout << test_filename << ".PrepareAndAbortSwapAsync: " << flush;
- tpct.testPrepareAndAbortSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwapAsync)
-{
- cout << test_filename << ".AbortNoPrepareSwapAsync: " << flush;
- tpct.testAbortNoPrepareSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(CommitEnqueueAsync)
-{
- cout << test_filename << ".CommitEnqueueAsync: " << flush;
- tpct.testCommitEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueueAsync)
-{
- cout << test_filename << ".PrepareAndAbortEnqueueAsync: " << flush;
- tpct.testPrepareAndAbortEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueueAsync)
-{
- cout << test_filename << ".AbortNoPrepareEnqueueAsync: " << flush;
- tpct.testAbortNoPrepareEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(CommitDequeueAsync)
-{
- cout << test_filename << ".CommitDequeueAsync: " << flush;
- tpct.testCommitDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeueAsync)
-{
- cout << test_filename << ".PrepareAndAbortDequeueAsync: " << flush;
- tpct.testPrepareAndAbortDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeueAsync)
-{
- cout << test_filename << ".AbortNoPrepareDequeueAsync: " << flush;
- tpct.testAbortNoPrepareDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommittedAsync)
-{
- cout << test_filename << ".RecoverPreparedThenCommittedAsync: " << flush;
- tpct.testRecoverPreparedThenCommitted(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(RecoverPreparedThenAbortedAsync)
-{
- cout << test_filename << ".RecoverPreparedThenAbortedAsync: " << flush;
- tpct.testRecoverPreparedThenAborted(true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/python_tests/flow_to_disk.py 2008-07-14 11:51:10 UTC (rev 2190)
+++ store/branches/mrg-1.0/cpp/tests/python_tests/flow_to_disk.py 2008-07-14 12:43:10 UTC (rev 2191)
@@ -22,7 +22,7 @@
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
-class AsyncFlowToDiskTests(TestBase010):
+class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
def test_01_simple_max_count_transient(self):
17 years, 9 months
rhmessaging commits: r2190 - store/branches.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-14 07:51:10 -0400 (Mon, 14 Jul 2008)
New Revision: 2190
Added:
store/branches/mrg-1.0/
Log:
Branch for MRG 1.0, taken from trunk r.2153.
Copied: store/branches/mrg-1.0 (from rev 2153, store/trunk)
17 years, 9 months
rhmessaging commits: r2189 - in store/trunk/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-11 17:05:48 -0400 (Fri, 11 Jul 2008)
New Revision: 2189
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/tests/TransactionalTest.cpp
Log:
Fixed 1PC multi-queue atomic commits/aborts. Also fixed unnecessary prepared list writes that occur in local transactions that contain no durable messages.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-11 21:05:48 UTC (rev 2189)
@@ -1222,7 +1222,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn,
+void BdbMessageStore::completed(TxnCtxt& txn,
bool commit)
{
try {
@@ -1260,21 +1260,25 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
- TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
+ localPrepare(txn);
+}
+void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
+{
try {
chkInitPreparedXidStore();
- txn->incrDtokRef();
- DataTokenImpl* dtokp = txn->getDtok();
+ ctxt->incrDtokRef();
+ DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->prepare(preparedXidStorePtr.get());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
- txn->sync();
+ ctxt->sync();
} catch (const std::exception& e) {
- QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
+ QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
}
}
@@ -1283,22 +1287,22 @@
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
- } else {
- txn->complete(true);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
- } else {
- txn->complete(false);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
TxnCtxt* BdbMessageStore::check(TransactionContext* ctxt)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-11 21:05:48 UTC (rev 2189)
@@ -169,7 +169,7 @@
bool create(Db& db,
IdSequence& seq,
const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn,
+ void completed(TxnCtxt& txn,
bool commit);
void record2pcOp(Db& db,
TPCTxnCtxt& txn,
@@ -311,6 +311,8 @@
void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ void localPrepare(TxnCtxt* ctxt);
+
void commit(qpid::broker::TransactionContext& ctxt);
void abort(qpid::broker::TransactionContext& ctxt);
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-11 21:05:48 UTC (rev 2189)
@@ -59,6 +59,7 @@
IdSequence* loggedtx;
boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
+ JournalImpl* preparedXidStorePtr;
/**
* local txn id, if non XA.
@@ -72,6 +73,8 @@
commitTxn(static_cast<JournalImpl*>(*i), commit);
}
impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
}
void commitTxn(JournalImpl* jc, bool commit) {
@@ -95,7 +98,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -125,6 +128,8 @@
for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
firstloop = false;
}
}
@@ -167,7 +172,9 @@
virtual const std::string& getXid() { return tid; }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
+ bool impactedQueuesEmpty() { return impactedQueues.empty(); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -182,25 +189,9 @@
{
protected:
const std::string xid;
- JournalImpl* preparedXidStorePtr;
- virtual void completeTxn(bool commit) {
- TxnCtxt::completeTxn(commit);
- if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
- }
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
- void sync() {
- TxnCtxt::sync();
- bool allWritten = false;
- if (preparedXidStorePtr) {
- while (!allWritten) {
- allWritten = true;
- sync_jrnl(preparedXidStorePtr, true, allWritten);
- }
- }
- }
- inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-11 21:05:48 UTC (rev 2189)
@@ -33,6 +33,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -55,6 +56,7 @@
// to end to simulate multi-queue txn complete failure.
while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
}
+ void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
};
// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
@@ -68,6 +70,25 @@
// pass sequence number for c/a
return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
}
+ void commit(TransactionContext& ctxt, const bool complete_prepared_list) {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
+ }
+ void abort(TransactionContext& ctxt, const bool complete_prepared_list)
+ {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
+ }
};
// === Helper fns ===
@@ -178,10 +199,11 @@
checkMsg(y, 0);
}
-void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
{
setup<TestMessageStore>();
- std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
+ TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+ std::auto_ptr<TransactionContext> txn(tmsp->begin());
//create two messages and enqueue them onto both queues:
boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
@@ -193,9 +215,9 @@
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
- store->commit(*txn);
+ tmsp->commit(*txn, complete_prepared_list);
else
- store->abort(*txn);
+ tmsp->abort(*txn, complete_prepared_list);
restart<TestMessageStore>();
// Check outcome
@@ -225,47 +247,61 @@
swap(false);
cout << "ok" << endl;
}
-/*
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
{
cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
- testMultiQueueTxn(0, true);
+ testMultiQueueTxn(0, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
{
cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
- testMultiQueueTxn(0, false);
+ testMultiQueueTxn(0, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
{
cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
- testMultiQueueTxn(1, true);
+ testMultiQueueTxn(1, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
{
cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
- testMultiQueueTxn(1, false);
+ testMultiQueueTxn(1, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
{
cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
- testMultiQueueTxn(2, true);
+ testMultiQueueTxn(2, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
{
cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
- testMultiQueueTxn(2, false);
+ testMultiQueueTxn(2, false, false);
cout << "ok" << endl;
}
-*/
+
QPID_AUTO_TEST_SUITE_END()
17 years, 9 months
rhmessaging commits: r2188 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-11 11:55:56 -0400 (Fri, 11 Jul 2008)
New Revision: 2188
Modified:
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Corrected signed/unsigned comparison (which does not show up on F9)
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-11 15:28:35 UTC (rev 2187)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-11 15:55:56 UTC (rev 2188)
@@ -153,9 +153,9 @@
test->checkMsg(test->queueA, 0);
test->checkMsg(test->queueB, 0);
// Check there are no remaining open txns in store
- BOOST_CHECK_EQUAL(0, sptr->getRemainingTxns(*(test->queueA)));
- BOOST_CHECK_EQUAL(0, sptr->getRemainingTxns(*(test->queueB)));
- BOOST_CHECK_EQUAL(0, sptr->getRemainingPreparedListTxns());
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
}
};
17 years, 9 months
rhmessaging commits: r2187 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-11 11:28:35 -0400 (Fri, 11 Jul 2008)
New Revision: 2187
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/StorePlugin.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/deq_hdr.hpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Fixed 2PC multi-queue transaction atomicity problem. Any multi-queue 2PC commit/abort which is interrupted by failure will now be completed on recover for all queues which did not get processed. Local txns still have this problem, however. Some code tidy-up is also included. Removed old error messages from StorePlugin which are no longer possible owing to the removal of sync journal.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -26,6 +26,7 @@
#include "BindingDbt.h"
#include "BufferValue.h"
#include "IdPairDbt.h"
+#include "jrnl/txn_map.hpp"
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
@@ -49,29 +50,37 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
- queueDb(&env, 0),
- configDb(&env, 0),
- exchangeDb(&env, 0),
- messageDb(&env, 0),
- mappingDb(&env, 0),
- bindingDb(&env, 0),
- generalDb(&env, 0),
- numJrnlFiles(defNumJrnlFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
- wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
- wcache_num_pages(JRNL_WMGR_DEF_PAGES),
- highestRid(0),
- isInit(false),
- envPath(envpath),
- mgmtObject(0)
+BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+ const bool _deq_flag,
+ const bool _commit_flag) :
+ rid(_rid),
+ deq_flag(_deq_flag),
+ commit_flag(_commit_flag)
+{}
+BdbMessageStore::BdbMessageStore(const char* envpath) :
+ env(0),
+ queueDb(&env, 0),
+ configDb(&env, 0),
+ exchangeDb(&env, 0),
+ messageDb(&env, 0),
+ mappingDb(&env, 0),
+ bindingDb(&env, 0),
+ generalDb(&env, 0),
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
+ wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
+ isInit(false),
+ envPath(envpath),
+ mgmtObject(0)
+
{}
-
+
void BdbMessageStore::initManagement (Broker* broker)
{
- if (broker != 0)
- {
+ if (broker != 0) {
ManagementAgent* agent = ManagementAgent::getAgent ();
if (agent != 0)
@@ -88,13 +97,16 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
-{
+bool BdbMessageStore::init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize)
+{
if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
-
+
// set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
@@ -115,7 +127,7 @@
// 1 MiB total cache
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
-
+
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
@@ -133,12 +145,11 @@
TxnCtxt txn;
try {
-
txn.begin(env, false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
- open(messageDb, txn.get(), "messages.db", false);
+ open(messageDb, txn.get(), "messages.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
@@ -154,7 +165,7 @@
txn.abort();
throw;
}
-
+
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -162,25 +173,21 @@
void BdbMessageStore::chkInitPreparedXidStore()
{
- if (!preparedXidStorePtr->is_init())
- {
+ if (!preparedXidStorePtr->is_ready()) {
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
}
}
-bool BdbMessageStore::init(const qpid::Options* options)
+bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
-
+
u_int16_t numJrnlFiles = opts->numJrnlFiles;
- if (numJrnlFiles < JRNL_MIN_NUM_FILES)
- {
+ if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
numJrnlFiles = JRNL_MIN_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
- }
- else if (numJrnlFiles > JRNL_MAX_NUM_FILES)
- {
+ } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
numJrnlFiles = JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
}
@@ -188,17 +195,14 @@
u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (jrnlFsizePgs < jrnlMinFsizePgs)
- {
+ if (jrnlFsizePgs < jrnlMinFsizePgs) {
jrnlFsizePgs = jrnlMinFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
- }
- else if (jrnlFsizePgs > jrnlMaxFsizePgs)
- {
+ } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
jrnlFsizePgs = jrnlMaxFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
}
-
+
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
@@ -213,14 +217,11 @@
break;
default:
u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0)
- {
+ if (oldJrnlWrCachePageSize == 0) {
// For zero value, use default
jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- }
- else
- {
+ } else {
// For any positive value, use closest value
if (oldJrnlWrCachePageSize < 6)
jrnlWrCachePageSize = 4;
@@ -241,17 +242,19 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
+void BdbMessageStore::open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
-BdbMessageStore::~BdbMessageStore()
+BdbMessageStore::~BdbMessageStore()
{
try {
-
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
@@ -280,17 +283,18 @@
(*i)->truncate(txn, &count, 0);
}
- txn->commit(0);
- try{
+ txn->commit(0);
+ try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
journal::jdir::delete_dir(getPxidBaseDir(),true);
- }
+ }
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
- }
+ }
}
-void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
+void BdbMessageStore::create(PersistableQueue& queue,
+ const FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
@@ -316,15 +320,15 @@
string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
+
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
+ try {
// init will create the deque's for the init...
jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": create() failed: " + e.what());
}
-
try {
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
@@ -340,21 +344,20 @@
destroy(queueDb, queue);
deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
- if (eqs)
- {
+ if (eqs) {
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
- jQueue->delete_jrnl_files();
+ jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
}
}
-void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
+void BdbMessageStore::create(const PersistableExchange& exchange,
+ const FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
-
try {
if (!create(exchangeDb, exchangeIdSequence, exchange)) {
THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
@@ -379,7 +382,6 @@
if (general.getPersistenceId()) {
THROW_STORE_EXCEPTION("General configuration item already created");
}
-
try {
if (!create(generalDb, generalIdSequence, general)) {
THROW_STORE_EXCEPTION("General configuration already exists");
@@ -395,7 +397,9 @@
destroy(generalDb, general);
}
-bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
+bool BdbMessageStore::create(Db& db,
+ IdSequence& seq,
+ const Persistable& p)
{
u_int64_t id (seq.next());
Dbt key(&id, sizeof(id));
@@ -417,11 +421,13 @@
}
-void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+void BdbMessageStore::bind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable& a)
{
checkInit();
- IdDbt key(e.getPersistenceId());
+ IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(env, true);
@@ -429,8 +435,10 @@
txn.commit();
}
-void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable&)
+void BdbMessageStore::unbind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable&)
{
checkInit();
deleteBinding(e, q, k);
@@ -451,7 +459,7 @@
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
-
+
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
@@ -467,35 +475,45 @@
}
//recover transactions:
- for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->prepare(preparedXidStorePtr.get());
+
// Restore data token state in TxnCtxt
- xid_rid_map_citr citr = preparedMap.find(i->xid);
- if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
- tpcc->recoverDtok(citr->second, i->xid);
- tpcc->addXidRecord(preparedXidStorePtr.get());
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
+ if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ tpcc->recoverDtok(citr->second.rid, i->xid);
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+ // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // was interrupted part way through committing/aborting the impacted queues. Complete this process.
+ bool incomplTxnFlag = citr->second.deq_flag;
+
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->enqueue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->dequeue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
+
+ if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
}
registry.recoveryComplete();
}
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
- prepared, message_index& messages)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn,
+ RecoveryManager& registry,
+ queue_index& queue_index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -511,7 +529,7 @@
RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
-
+
const char* queueName = queue->getName().c_str();
JournalImpl* jQueue = 0;
{
@@ -519,14 +537,14 @@
jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-
+
try
{
u_int64_t thisHighestRid = 0;
jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
+ recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -545,7 +563,9 @@
}
-void BdbMessageStore::recoverExchanges(TxnCtxt& txn, RecoveryManager& registry, exchange_index& index)
+void BdbMessageStore::recoverExchanges(TxnCtxt& txn,
+ RecoveryManager& registry,
+ exchange_index& index)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
Cursor exchanges;
@@ -567,13 +587,15 @@
exchangeIdSequence.reset(maxExchangeId + 1);
}
-void BdbMessageStore::recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues)
+void BdbMessageStore::recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues)
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -583,7 +605,7 @@
uint64_t queueId = buffer.getLongLong();
string queueName;
string routingkey;
- FieldTable args;
+ FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
@@ -600,7 +622,8 @@
}
}
-void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn,
+ RecoveryManager& registry)
{
Cursor items;
items.open(generalDb, txn.get());
@@ -621,16 +644,18 @@
}
-void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& prepared,
+ message_index& messages)
{
+ size_t preambleLength = sizeof(u_int32_t)/*header size*/;
- size_t preambleLength = sizeof(u_int32_t)/*header size*/;
-
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
size_t readSize = 0;
- unsigned msg_count=0;
+ unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
@@ -643,58 +668,63 @@
bool transientFlag = false;
bool externalFlag = false;
+ dtokp.set_wstate(DataTokenImpl::ENQ);
- dtokp.set_wstate(DataTokenImpl::ENQ);
- // read the message from the Journal.
+ // Read the message from the Journal.
try {
-
- //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
-
unsigned aio_sleep_cnt = 0;
while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
readSize = dtokp.dsize();
-
+
switch (res)
{
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- RecoverableMessage::shared_ptr msg;
- char* data = (char*)dbuff;
-
- unsigned headerSize;
- if (externalFlag){
- msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
- } else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- msg = recovery.recoverMessage(headerBuff);
- }
- msg->setPersistenceId(dtokp.rid());
-
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize) && !externalFlag) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ msg_count++;
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
-
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
-
- if (xidbuff)
- ::free(xidbuff);
- else if (dbuff)
- ::free(dbuff);
- aio_sleep_cnt = 0;
- break;
+ unsigned headerSize;
+ if (externalFlag) {
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
+ msg->setPersistenceId(dtokp.rid());
+
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize) && !externalFlag) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+
+ if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
+ std::string xid((char*)xidbuff, xidbuffSize);
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ if (citr->second.commit_flag)
+ queue->recover(msg);
+ } else {
+ messages[dtokp.rid()] = msg;
+ }
+ } else {
+ queue->recover(msg);
+ }
+
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+
+ if (xidbuff)
+ ::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
+ aio_sleep_cnt = 0;
+ break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
@@ -703,7 +733,7 @@
break;
case rhm::journal::RHM_IORES_EMPTY:
read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
assert( "Store Error: Unexpected msg state");
} // switch
@@ -714,33 +744,37 @@
}
}
-RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t messageId, unsigned& headerSize)
+RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t messageId,
+ unsigned& headerSize)
{
Dbt key (&messageId, sizeof(messageId));
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+ size_t preamble_length = sizeof(u_int32_t); /*header size*/
BufferValue value(preamble_length, 0);
value.buffer.record();
if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
//read header only to begin with
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
return recovery.recoverMessage(header.buffer);
-}
+}
-int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked,
- message_index& prepared)
+int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor mappings;
mappings.open(mappingDb, txn.get());
@@ -752,10 +786,10 @@
if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
- } else {
+ } else {
RecoverableQueue::shared_ptr queue = index[value.id];
- if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
- prepared[msgId.id] = msg;
+ if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
+ messages[msgId.id] = msg;
} else {
queue->recover(msg);
}
@@ -766,22 +800,69 @@
return count;
}
+void BdbMessageStore::recoverPreparedXidJournal()
+{
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ u_int64_t thisHighestRid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
+}
+
+void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+{
+ if (preparedXidStorePtr.get()) {
+ if (!preparedXidStorePtr->is_ready())
+ recoverPreparedXidJournal();
+
+ // TODO: The journal will return a const txn_map and the txn_map will support
+ // const operations at some point. Using non-const txn_map this way is ugly...
+ journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ std::vector<std::string> xidList;
+ tmap.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
+ journal::txn_data_list txnList = tmap.get_tdata_list(*i);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+ bool commitFlag = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ }
+ }
+}
+
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> preparedXidSet;
- collectPreparedXids(preparedXidSet);
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
}
-void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
+void BdbMessageStore::readLockedMappings(Db& db,
+ txn_lock_map& mappings)
{
Cursor c;
c.open(db, 0);
@@ -796,54 +877,10 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
- {
- u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
- JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, thisHighestRid, 0);
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- try {
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- DataTokenImpl dtokp;
- bool done = false;
- long aio_sleep_cnt = 0;
- while (!done) {
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
- rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
- switch (res) {
- case rhm::journal::RHM_IORES_SUCCESS:
- if (xidbuffSize > 0) {
- xids.insert(std::string((const char*)xidbuff, xidbuffSize));
- preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
- ::free(xidbuff);
- } else {
- THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
- }
- aio_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- done = true;
- break;
- default:
- assert( "Store Error: Unexpected msg state");
- }
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
- }
-
- preparedXidStorePtr->recover_complete(); // start journal.
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ xids.insert(i->first);
}
}
@@ -865,8 +902,9 @@
txn.abort();
throw;
}
- }
+ }
}
+
void BdbMessageStore::destroy(PersistableMessage& msg)
{
checkInit();
@@ -881,17 +919,19 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
- }
+ }
}
}
-
-u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(Db& db,
+ Dbt& key)
{
return getRecordSize(0, db, key);
}
-u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key)
{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
@@ -907,7 +947,8 @@
return peek.get_size();
}
-void BdbMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
+void BdbMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
{
checkInit();
u_int64_t messageId (msg->getPersistenceId());
@@ -933,28 +974,31 @@
}
} else {
THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
- }
+ }
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- const intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ const intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length)
{
checkInit();
- u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
+ u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ + msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
-
+
if (messageId != 0) {
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc && jc->is_enqueued(messageId) ){
- if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ if (jc && jc->is_enqueued(messageId) ) {
+ if (jc->loadMsgContent(messageId, data, realOffset, length)) {
return;
}
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": loadContent() failed: " + e.what());
- }
+ }
TxnCtxt txn;
txn.begin(env, true);
try {
@@ -965,11 +1009,11 @@
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb.get(txn.get(), &key, &value, 0);
+ int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
txn.abort();
delete [] buffer;
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
txn.commit();
data.assign(buffer, value.get_size());
@@ -990,11 +1034,11 @@
std::string qn = queue.getName();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc){
+ if (jc) {
// TODO: check if this result should be used...
/*rhm::journal::iores res =*/ jc->flush();
}
- }catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
@@ -1031,14 +1075,14 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
-void BdbMessageStore::store(const PersistableQueue* queue,
- TxnCtxt* txn, Dbt& messageId,
- const intrusive_ptr<PersistableMessage>& message,
+void BdbMessageStore::store(const PersistableQueue* queue,
+ TxnCtxt* txn, Dbt& messageId,
+ const intrusive_ptr<PersistableMessage>& message,
bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
- char* buff= 0;
+ char* buff = 0;
if (!message->isContentReleased() )
{
buff = static_cast<char*>(::alloca(size)); // long + headers + content
@@ -1048,23 +1092,22 @@
}
try {
-
- if ( queue ) {
+ if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
+
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
+ if (txn->getXid().empty()) {
+ if (message->isContentReleased()) {
jc->enqueue_extern_data_record(size, dtokp.get(), false);
} else {
jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
- }else {
- if (message->isContentReleased()){
+ } else {
+ if (message->isContentReleased()) {
jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
} else {
jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
@@ -1086,7 +1129,7 @@
void BdbMessageStore::dequeue(TransactionContext* ctxt,
const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
-{
+{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
@@ -1104,28 +1147,27 @@
} else {
txn = &implicit;
}
-
+
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
-
+ async_dequeue(ctxt, msg, queue);
+
msg->dequeueComplete();
}
-void BdbMessageStore::async_dequeue(
- TransactionContext* ctxt,
- const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt,
+ const intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
- ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
string tid;
- if (ctxt){
+ if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
@@ -1136,7 +1178,7 @@
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
}
@@ -1147,15 +1189,17 @@
return 0;
}
-bool BdbMessageStore::deleteIfUnused(DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(DbTxn* txn,
+ Dbt& messageId)
{
Cursor cursor;
cursor.open(mappingDb, txn);
-
return deleteIfUnused(cursor, txn, messageId);
}
-bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId)
{
if (isUnused(cursor, messageId)) {
messageDb.del(txn, &messageId, 0);
@@ -1178,7 +1222,8 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn,
+ bool commit)
{
try {
// Nothing to do if not prepared
@@ -1188,7 +1233,7 @@
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
} catch (const std::exception& e) {
@@ -1197,7 +1242,7 @@
}
}
-auto_ptr<TransactionContext> BdbMessageStore::begin()
+auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
// pass sequence number for c/a
@@ -1217,7 +1262,7 @@
checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
-
+
try {
chkInitPreparedXidStore();
txn->incrDtokRef();
@@ -1225,8 +1270,7 @@
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->addXidRecord(preparedXidStorePtr.get());
-
+ txn->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
} catch (const std::exception& e) {
@@ -1235,18 +1279,18 @@
}
}
-void BdbMessageStore::commit(TransactionContext& ctxt)
+void BdbMessageStore::commit(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->complete(true);
}
}
-void BdbMessageStore::abort(TransactionContext& ctxt)
+void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
@@ -1264,7 +1308,10 @@
return txn;
}
-void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+void BdbMessageStore::put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
try {
int status = db.put(txn, &key, &value, DB_NODUPDATA);
@@ -1278,8 +1325,10 @@
}
}
-
-bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+bool BdbMessageStore::deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
Cursor cursor;
cursor.open(db, txn);
@@ -1301,9 +1350,9 @@
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -1322,16 +1371,18 @@
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
-void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& bkey)
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& bkey)
{
TxnCtxt txn;
txn.begin(env, true);
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key(exchange.getPersistenceId());
- Dbt value;
+ Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
@@ -1356,21 +1407,21 @@
txn.commit();
}
-string BdbMessageStore::getJrnlBaseDir()
+string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getPxidBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/pxid/" ;
@@ -1389,19 +1440,19 @@
dir << std::setw(4);
dir << std::setfill('0');
u_int32_t count = 0;
- for (u_int32_t i=0; i < strlen(queueName); i++)
- count += queueName[i];
-
- dir << (count%20);
+ for (u_int32_t i = 0; i < strlen(queueName); i++) {
+ count += queueName[i];
+ }
+ dir << (count % 20);
dir << "/" << queueName << "/";
return dir.str();
}
BdbMessageStore::Options::Options(const std::string& name) :
- qpid::Options(name),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
- wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+ qpid::Options(name),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
+ wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-11 15:28:35 UTC (rev 2187)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -53,16 +53,26 @@
*/
class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
{
+ protected:
typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
-
+
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
-
- typedef std::map<std::string, u_int64_t> xid_rid_map;
- typedef xid_rid_map::const_iterator xid_rid_map_citr;
+ // Structs for preparedXidStore recover state
+ struct PreparedRecoverStruct {
+ u_int64_t rid;
+ bool deq_flag;
+ bool commit_flag;
+ PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ };
+ typedef PreparedRecoverStruct PreparedRecover;
+ typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+ typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+ typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
@@ -80,7 +90,11 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
+
+ // Pointer to prepared XID journal instance
boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+ PreparedRecoverMap preparedXidStoreRecoverMap;
+
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -90,7 +104,6 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
- xid_rid_map preparedMap;
u_int64_t highestRid;
bool isInit;
const char* envPath;
@@ -99,57 +112,99 @@
qpid::management::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
- bool mode(const bool mode, const bool force);
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& messages);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ void recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& messages);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked,
+ message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
- void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
- void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked, message_index& prepared);
+ void recoverExchanges(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ exchange_index& index);
+ void recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverPreparedXidJournal();
+ void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db, txn_lock_map& mappings);
+ void readLockedMappings(Db& db,
+ txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
+ void store(const qpid::broker::PersistableQueue* queue,
+ TxnCtxt* txn,
+ Dbt& messageId,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
bool newId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
- bool isUnused(Cursor& cursor, Dbt& messageId);
- void destroy(Db& db, const qpid::broker::Persistable& p);
- bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, bool commit);
- void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ bool deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn,
+ Dbt& messageId);
+ bool isUnused(Cursor& cursor,
+ Dbt& messageId);
+ void destroy(Db& db,
+ const qpid::broker::Persistable& p);
+ bool create(Db& db,
+ IdSequence& seq,
+ const qpid::broker::Persistable& p);
+ void completed(TPCTxnCtxt& txn,
+ bool commit);
+ void record2pcOp(Db& db,
+ TPCTxnCtxt& txn,
+ u_int64_t messageId,
+ u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
const std::string& key);
- u_int64_t getRecordSize(Db& db, Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
- void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
-
-
+ u_int64_t getRecordSize(Db& db,
+ Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key);
+ void put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ bool deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ void open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey);
+
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
std::string getJrnlDir(const char* queueName);
- std::string getJrnlBaseDir();
- std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
@@ -183,66 +238,91 @@
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
BdbMessageStore(const char* envpath = 0);
+
virtual ~BdbMessageStore();
+
bool init(const qpid::Options* options);
- bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+
+ bool init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize);
+
void initManagement (qpid::broker::Broker* broker);
void truncate();
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
+
void destroy(qpid::broker::PersistableQueue& queue);
void create(const qpid::broker::PersistableExchange& queue,
const qpid::framing::FieldTable& args);
+
void destroy(const qpid::broker::PersistableExchange& queue);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
void create(const qpid::broker::PersistableConfig& config);
+
void destroy(const qpid::broker::PersistableConfig& config);
void recover(qpid::broker::RecoveryManager& queues);
void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+
+ void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
+
void loadContent(const qpid::broker::PersistableQueue& queue,
- const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
+
void flush(const qpid::broker::PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
-
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
+
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+
void prepare(qpid::broker::TPCTransactionContext& ctxt);
+
void commit(qpid::broker::TransactionContext& ctxt);
+
void abort(qpid::broker::TransactionContext& ctxt);
qpid::management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
- { return qpid::management::Manageable::STATUS_OK; }
-};
-}
-}
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ { return qpid::management::Manageable::STATUS_OK; }
+}; // class BdbMessageStore
+} // namespace bdbstore
+} // namespace rhm
+
#endif
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -166,8 +166,7 @@
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
- i != prep_tx_list_ptr->end(); i++) {
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
prep_xid_list.push_back(i->xid);
}
@@ -347,9 +346,9 @@
}
void
-JournalImpl::dequeue_data_record(data_tok* const dtokp)
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_data_record(dtokp));
+ handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
if (_mgmtObject != 0)
{
@@ -359,9 +358,9 @@
}
void
-JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
if (_mgmtObject != 0)
{
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-07-11 15:28:35 UTC (rev 2187)
@@ -156,9 +156,9 @@
void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- void dequeue_data_record(journal::data_tok* const dtokp);
+ void dequeue_data_record(journal::data_tok* const dtokp, const bool txn_coml_commit = false);
- void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
+ void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -49,19 +49,11 @@
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
- throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
- "--store-directory must be present.");
+ throw Exception ("If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
options.storeDir = dataDir.getPath ();
}
-
- if (!store->init (&options))
- {
- throw Exception("Existing journal found in different bdb/async mode. "
- "Move or delete existing data files before changing modes, or use "
- "'--store-force yes' to discard existing data.");
- }
-
+ store->init(&options);
broker->setStore (store);
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-11 15:28:35 UTC (rev 2187)
@@ -47,11 +47,11 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
-protected:
-
+ protected:
static qpid::sys::Mutex globalSerialiser;
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+ typedef ipqdef::iterator ipqItr;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
ipqdef impactedQueues; // list of Queues used in the txn
@@ -66,31 +66,34 @@
std::string tid;
DbTxn* txn;
- void completeTXN(bool commit) {
+ virtual void completeTxn(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->addRef();
- dtokp->set_external_rid(true);
- dtokp->set_rid(loggedtx->next());
- try {
- if (commit) {
- jc->txn_commit(dtokp.get(), getXid());
- jc->flush(true);
- } else {
- jc->txn_abort(dtokp.get(), getXid());
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ commitTxn(static_cast<JournalImpl*>(*i), commit);
+ }
+ impactedQueues.clear();
+ }
+
+ void commitTxn(JournalImpl* jc, bool commit) {
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(loggedtx->next());
+ try {
+ if (commit) {
+ jc->txn_commit(dtokp.get(), getXid());
+ jc->flush(true);
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
}
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
- deleteXidRecord();
}
-public:
+ public:
TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
if (loggedtx) {
@@ -119,22 +122,25 @@
if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop) jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
firstloop = false;
}
}
+ void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+ }
+
void begin(DbEnv& env, bool sync = false) {
env.txn_begin(0, &txn, 0);
if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
@@ -160,9 +166,8 @@
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
- void complete(bool commit) { completeTXN(commit); }
+ void complete(bool commit) { completeTxn(commit); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -175,11 +180,29 @@
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
{
+ protected:
const std::string xid;
-public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
- virtual bool isTPC() { return true; }
- virtual const std::string& getXid() { return xid; }
+ JournalImpl* preparedXidStorePtr;
+ virtual void completeTxn(bool commit) {
+ TxnCtxt::completeTxn(commit);
+ if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
+ }
+
+ public:
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
+ void sync() {
+ TxnCtxt::sync();
+ bool allWritten = false;
+ if (preparedXidStorePtr) {
+ while (!allWritten) {
+ allWritten = true;
+ sync_jrnl(preparedXidStorePtr, true, allWritten);
+ }
+ }
+ }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ inline virtual bool isTPC() { return true; }
+ inline virtual const std::string& getXid() { return xid; }
};
}}
Modified: store/trunk/cpp/lib/jrnl/deq_hdr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_hdr.hpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/deq_hdr.hpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -87,6 +87,7 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
u_int32_t _filler0; ///< Little-endian filler for 32-bit size_t
#endif
+ static const u_int16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
/**
* \brief Default constructor, which sets all values to 0.
@@ -105,7 +106,8 @@
* \brief Convenience constructor which initializes values during construction.
*/
inline deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
- const u_int64_t deq_rid, const std::size_t xidsize, const bool owi):
+ const u_int64_t deq_rid, const std::size_t xidsize, const bool owi,
+ const bool txn_coml_commit = false):
rec_hdr(magic, version, rid, owi), _deq_rid(deq_rid),
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
_filler0(0),
@@ -114,8 +116,17 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
, _filler0(0)
#endif
- {}
+ { set_txn_coml_commit(txn_coml_commit); }
+
+ inline bool is_txn_coml_commit() const { return _uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK; }
+
+ inline void set_txn_coml_commit(const bool commit)
+ {
+ _uflag = commit ? _uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+ _uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+ }
+
/**
* \brief Returns the size of the header in bytes.
*/
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -52,8 +52,8 @@
{}
deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi):
- _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi),
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
_xidp(xidp),
_buff(0),
_deq_tail(_deq_hdr)
@@ -68,6 +68,8 @@
deq_rec::reset()
{
_deq_hdr._rid = 0;
+ _deq_hdr.set_owi(false);
+ _deq_hdr.set_txn_coml_commit(false);
_deq_hdr._deq_rid = 0;
_deq_hdr._xidsize = 0;
_deq_tail._rid = 0;
@@ -77,10 +79,11 @@
void
deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi)
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
{
_deq_hdr._rid = rid;
_deq_hdr.set_owi(owi);
+ _deq_hdr.set_txn_coml_commit(txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_deq_tail._rid = rid;
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -67,20 +67,21 @@
deq_rec();
// constructor used for write operations, where xid already exists
deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
virtual ~deq_rec();
// Prepare instance for use in reading data from journal
void reset();
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
u_int32_t decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks);
// Decode used for recover
bool rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs);
+ inline bool is_txn_coml_commit() const { return _deq_hdr.is_txn_coml_commit(); }
inline u_int64_t rid() const { return _deq_hdr._rid; }
inline u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
std::size_t get_xid(void** const xidpp);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -338,25 +338,25 @@
}
iores
-jcntl::dequeue_data_record(data_tok* const dtokp)
+jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
return r;
}
}
iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
return r;
}
}
@@ -711,7 +711,8 @@
dr.get_xid(&xidp);
assert(xidp != 0);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false,
+ dr.is_txn_coml_commit()));
_tmap.set_aio_compl(xid, dr.rid());
std::free(xidp);
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -428,10 +428,13 @@
*
* \param dtokp Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_data_record(data_tok* const dtokp);
+ iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -446,10 +449,13 @@
* through journal.
* \param xid String containing xid. An empty string (i.e. length=0) will be considered
* non-transactional.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid);
+ iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -574,8 +580,7 @@
* \return <b><i>true</i></b> if the jouranl is ready to read and write data;
* <b><i>false</i></b> otherwise.
*/
- inline bool is_ready() const { return _init_flag and not _stop_flag; }
- inline bool is_init() const { return _init_flag; }
+ inline bool is_ready() const { return _init_flag && !_stop_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -602,7 +607,8 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -42,11 +42,12 @@
{
txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag):
+ const bool enq_flag, const bool commit_flag):
_rid(rid),
_drid(drid),
_fid(fid),
_enq_flag(enq_flag),
+ _commit_flag(commit_flag),
_aio_compl(false)
{}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -64,9 +64,10 @@
u_int64_t _drid; ///< Dequeue record id for this operation
u_int16_t _fid; ///< File id, to be used when transferring to emap on commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode
bool _aio_compl; ///< Initially false, set to true when record AIO returns
txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag);
+ const bool enq_flag, const bool commit_flag = false);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -256,7 +256,7 @@
}
iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
{
if (xid_len)
assert(xid_ptr != 0);
@@ -284,7 +284,7 @@
const bool ext_rid = dtokp->external_rid();
u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi());
+ _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
if (!cont)
{
if (!ext_rid)
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -110,7 +110,8 @@
iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external);
- iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
+ iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len,
+ const bool txn_coml_commit);
iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores flush();
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -30,9 +30,9 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -45,99 +45,171 @@
const char* tdp = getenv("TMPDIR");
const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+ public:
+ TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ }
+};
+
+// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+// reamining open transactions
+class TestMessageStore: public BdbMessageStore
+{
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TransactionContext> begin() {
+ checkInit();
+ // pass sequence number for c/a
+ return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+ }
+};
+
// === Helper fns ===
const string nameA("queueA");
const string nameB("queueB");
-const Uuid messageId(true);
+//const Uuid messageId(true);
std::auto_ptr<BdbMessageStore> store;
-QueueRegistry queues;
+std::auto_ptr<QueueRegistry> queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
+template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
+ FieldTable settings;
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
- FieldTable settings;
queueA->create(settings);
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
queueB->create(settings);
-
- //create message and enqueue it onto first queue:
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
-
- queueA->deliver(msg);
}
+template <class T>
void restart()
{
queueA.reset();
queueB.reset();
+ queues.reset();
store.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
-void check(bool swapped)
+boost::intrusive_ptr<Message> createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key);
+ msg->getProperties<MessageProperties>()->setCorrelationId(id);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+}
+
+void swap(bool commit)
+{
+ setup<BdbMessageStore>();
+
+ //create message and enqueue it onto first queue:
+ boost::intrusive_ptr<Message> msgA = createMessage("Message", "exchange", "routing_key");
+ queueA->deliver(msgA);
+
+ boost::intrusive_ptr<Message> msgB = queueA->dequeue().payload;
+ BOOST_REQUIRE(msgB);
+ //move the message from one queue to the other as a transaction
+ std::auto_ptr<TransactionContext> txn = store->begin();
+ queueB->enqueue(txn.get(), msgB);//note: need to enqueue it first to avoid message being deleted
+ queueA->dequeue(txn.get(), msgB);
+ if (commit) {
+ store->commit(*txn);
+ } else {
+ store->abort(*txn);
+ }
+
+ restart<BdbMessageStore>();
+
+ // Check outcome
BOOST_REQUIRE(queueA);
BOOST_REQUIRE(queueB);
- Queue::shared_ptr x;//the other queue
+ Queue::shared_ptr x;//the queue from which the message was swapped
Queue::shared_ptr y;//the queue on which the message is expected to be
- if (swapped) {
+ if (commit) {
x = queueA;
y = queueB;
} else {
x = queueB;
y = queueA;
}
-
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+
+ checkMsg(x, 0);
+ checkMsg(y, 1, "Message");
+ checkMsg(y, 0);
}
-void swap(bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
{
- setup();
+ setup<TestMessageStore>();
+ std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- //move the message from one queue to the other as a transaction
- std::auto_ptr<TransactionContext> txn = store->begin();
- queueB->enqueue(txn.get(), msg);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn.get(), msg);
- if (commit) {
- store->commit(*txn);
- } else {
+ //create two messages and enqueue them onto both queues:
+ boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgA);
+ queueB->enqueue(txn.get(), msgA);
+ boost::intrusive_ptr<Message> msgB = createMessage("MessageB", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgB);
+ queueB->enqueue(txn.get(), msgB);
+
+ static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+ if (commit)
+ store->commit(*txn);
+ else
store->abort(*txn);
+ restart<TestMessageStore>();
+
+ // Check outcome
+ if (commit)
+ {
+ checkMsg(queueA, 2, "MessageA");
+ checkMsg(queueB, 2, "MessageA");
+ checkMsg(queueA, 1, "MessageB");
+ checkMsg(queueB, 1, "MessageB");
}
-
- restart();
- check(commit);
+ checkMsg(queueA, 0);
+ checkMsg(queueB, 0);
}
-
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
@@ -153,5 +225,47 @@
swap(false);
cout << "ok" << endl;
}
+/*
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ testMultiQueueTxn(0, true);
+ cout << "ok" << endl;
+}
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ testMultiQueueTxn(0, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ testMultiQueueTxn(1, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ testMultiQueueTxn(1, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ testMultiQueueTxn(2, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ testMultiQueueTxn(2, false);
+ cout << "ok" << endl;
+}
+*/
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-09 01:53:25 UTC (rev 2186)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-11 15:28:35 UTC (rev 2187)
@@ -30,9 +30,11 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "TxnCtxt.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -67,8 +69,8 @@
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
void init(){ msg = test->deliver(messageId, test->queueA); }
- void run(TPCTransactionContext* txn) { test->swap(txn); }
- void check(bool committed) { test->swapCheck(committed, messageId); }
+ void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+ void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); }
};
class Enqueue : public Strategy
@@ -81,17 +83,17 @@
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
void run(TPCTransactionContext* txn) {
- msg1 = test->enqueue(txn, "Enqueue1");
- msg2 = test->enqueue(txn, "Enqueue2");
- msg3 = test->enqueue(txn, "Enqueue3");
+ msg1 = test->enqueue(txn, "Enqueue1", test->queueA);
+ msg2 = test->enqueue(txn, "Enqueue2", test->queueA);
+ msg3 = test->enqueue(txn, "Enqueue3", test->queueA);
}
void check(bool committed) {
if (committed) {
- test->checkA(3, "Enqueue1");
- test->checkA(2, "Enqueue2");
- test->checkA(1, "Enqueue3");
+ test->checkMsg(test->queueA, 3, "Enqueue1");
+ test->checkMsg(test->queueA, 2, "Enqueue2");
+ test->checkMsg(test->queueA, 1, "Enqueue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
@@ -109,26 +111,94 @@
msg3 = test->deliver("Dequeue3", test->queueA);
}
void run(TPCTransactionContext* txn) {
- test->dequeue(txn);
- test->dequeue(txn);
- test->dequeue(txn);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
}
void check(bool committed) {
if (!committed) {
- test->checkA(3, "Dequeue1");
- test->checkA(2, "Dequeue2");
- test->checkA(1, "Dequeue3");
+ test->checkMsg(test->queueA, 3, "Dequeue1");
+ test->checkMsg(test->queueA, 2, "Dequeue2");
+ test->checkMsg(test->queueA, 1, "Dequeue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
+ class MultiQueueTxn : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ std::set<Queue::shared_ptr> queueset;
+ public:
+ MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+ virtual void init() {}
+ virtual void run(TPCTransactionContext* txn) {
+ queueset.insert(test->queueA);
+ queueset.insert(test->queueB);
+ msg1 = test->enqueue(txn, "Message1", queueset);
+ msg2 = test->enqueue(txn, "Message2", queueset);
+ queueset.clear();
+ }
+ virtual void check(bool committed) {
+ TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+ if (committed)
+ {
+ test->checkMsg(test->queueA, 2, "Message1");
+ test->checkMsg(test->queueB, 2, "Message1");
+ test->checkMsg(test->queueA, 1, "Message2");
+ test->checkMsg(test->queueB, 1, "Message2");
+ }
+ test->checkMsg(test->queueA, 0);
+ test->checkMsg(test->queueB, 0);
+ // Check there are no remaining open txns in store
+ BOOST_CHECK_EQUAL(0, sptr->getRemainingTxns(*(test->queueA)));
+ BOOST_CHECK_EQUAL(0, sptr->getRemainingTxns(*(test->queueB)));
+ BOOST_CHECK_EQUAL(0, sptr->getRemainingPreparedListTxns());
+ }
+ };
+
+ // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+ class TestTPCTxnCtxt : public TPCTxnCtxt
+ {
+ public:
+ TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ // If prepared list is not to be committed, set pointer to 0
+ if (!complete_prepared_list) preparedXidStorePtr = 0;
+ }
+ };
+
+ // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+ // reamining open transactions
+ class TestMessageStore: public BdbMessageStore
+ {
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
+ checkInit();
+ IdSequence* jtx = &messageIdSequence;
+ // pass sequence number for c/a
+ return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+ }
+ u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+ return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+ }
+ u_int32_t getRemainingPreparedListTxns() {
+ return preparedXidStorePtr->get_open_txn_cnt();
+ }
+ };
+
const string nameA;
const string nameB;
std::auto_ptr<BdbMessageStore> store;
std::auto_ptr<DtxManager> dtxmgr;
- QueueRegistry queues;
- LinkRegistry links;
+ std::auto_ptr<QueueRegistry> queues;
+ std::auto_ptr<LinkRegistry> links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
boost::intrusive_ptr<Message> msg1;
@@ -137,14 +207,14 @@
void recoverPrepared(bool commit)
{
- setup();
+ setup<BdbMessageStore>();
Swap swap(this, "RecoverPrepared");
swap.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
swap.run(txn.get());
store->prepare(*txn);
- restart();
+ restart<BdbMessageStore>();
//check that the message is not available from either queue
BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
@@ -158,58 +228,89 @@
}
swap.check(commit);
- restart();
+ restart<BdbMessageStore>();
swap.check(commit);
}
+
+ void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+ {
+ setup<TestMessageStore>();
+ MultiQueueTxn mqtTest(this);
+ mqtTest.init();
+ std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+ mqtTest.run(txn.get());
+ store->prepare(*txn);
+ // As the commits and aborts should happen through DtxManager, and it is too complex to
+ // pass all these test params through, we bypass DtxManager and use the store directly.
+ // This will prevent the queues from seeing committed txns, however. To test the success
+ // or failure of
+ static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+ if (commit)
+ store->commit(*txn);
+ else
+ store->abort(*txn);
+ restart<TestMessageStore>();
+ mqtTest.check(commit);
+ }
+
void commit(Strategy& strategy)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
store->prepare(*txn);
- store->commit(*txn);
- restart();
+ store->commit(*txn);
+ restart<BdbMessageStore>();
strategy.check(true);
}
void abort(Strategy& strategy, bool prepare)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
if (prepare) store->prepare(*txn);
store->abort(*txn);
- restart();
+ restart<BdbMessageStore>();
strategy.check(false);
}
- void swap(TPCTransactionContext* txn)
+ void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
{
- msg1 = queueA->dequeue().payload;//just dequeues in memory
+ msg1 = from->dequeue().payload;//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn, msg1);
+ to->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+ from->dequeue(txn, msg1);
}
- void dequeue(TPCTransactionContext* txn)
+ void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
{
- msg2 = queueA->dequeue().payload;//just dequeues in memory
- queueA->dequeue(txn, msg2);
+ msg2 = queue->dequeue().payload;//just dequeues in memory
+ queue->dequeue(txn, msg2);
}
- boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid)
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, Queue::shared_ptr& queue)
{
- boost::intrusive_ptr<Message> msg = createMessage(msgid);
- queueA->enqueue(txn, msg);
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ queue->enqueue(txn, msg);
return msg;
}
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+ {
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+ (*i)->enqueue(txn, msg);
+ }
+ return msg;
+ }
+
boost::intrusive_ptr<Message> deliver(const string& msgid, Queue::shared_ptr& queue)
{
msg4 = createMessage(msgid);
@@ -217,9 +318,10 @@
return msg4;
}
+ template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
@@ -239,60 +341,63 @@
return msg;
}
+ template <class T>
void restart()
{
queueA.reset();
queueB.reset();
store.reset();
+ queues.reset();
+ links.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+ links = std::auto_ptr<LinkRegistry>(new LinkRegistry(0));
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
- void swapCheck(bool swapped, const string& msgid)
+ void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
- BOOST_REQUIRE(queueA);
- BOOST_REQUIRE(queueB);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+ }
+
+ void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+ {
+ BOOST_REQUIRE(from);
+ BOOST_REQUIRE(to);
- Queue::shared_ptr x;//the other queue
- Queue::shared_ptr y;//the queue on which the message is expected to be
+ Queue::shared_ptr x; //the queue from which the message was swapped
+ Queue::shared_ptr y; //the queue on which the message is expected to be
if (swapped) {
- x = queueA;
- y = queueB;
+ x = from;
+ y = to;
} else {
- x = queueB;
- y = queueA;
+ x = to;
+ y = from;
}
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ checkMsg(x, 0);
+ checkMsg(y, 1, msgid);
+ checkMsg(y, 0);
}
- void checkA(u_int32_t size, const string& msgid = "<none>")
- {
- BOOST_REQUIRE(queueA);
- BOOST_CHECK_EQUAL(size, queueA->getMessageCount());
- if (size > 0) {
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
- }
- }
-
public:
- TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
+ TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
void testCommitEnqueue()
{
@@ -357,6 +462,46 @@
{
recoverPrepared(false);
}
+
+ void testMultiQueueCommit()
+ {
+ testMultiQueueTxn(2, true, true);
+ }
+
+ void testMultiQueueAbort()
+ {
+ testMultiQueueTxn(2, true, false);
+ }
+
+ void testMultiQueueNoQueueCommitRecover()
+ {
+ testMultiQueueTxn(0, false, true);
+ }
+
+ void testMultiQueueNoQueueAbortRecover()
+ {
+ testMultiQueueTxn(0, false, false);
+ }
+
+ void testMultiQueueSomeQueueCommitRecover()
+ {
+ testMultiQueueTxn(1, false, true);
+ }
+
+ void testMultiQueueSomeQueueAbortRecover()
+ {
+ testMultiQueueTxn(1, false, false);
+ }
+
+ void testMultiQueueAllQueueCommitRecover()
+ {
+ testMultiQueueTxn(2, false, true);
+ }
+
+ void testMultiQueueAllQueueAbortRecover()
+ {
+ testMultiQueueTxn(2, false, false);
+ }
};
TwoPhaseCommitTest tpct;
@@ -440,4 +585,60 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ tpct.testMultiQueueCommit();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ tpct.testMultiQueueAbort();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ tpct.testMultiQueueNoQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ tpct.testMultiQueueNoQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ tpct.testMultiQueueSomeQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ tpct.testMultiQueueSomeQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ tpct.testMultiQueueAllQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ tpct.testMultiQueueAllQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_SUITE_END()
17 years, 9 months
rhmessaging commits: r2186 - store/trunk/cpp.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2008-07-08 21:53:25 -0400 (Tue, 08 Jul 2008)
New Revision: 2186
Modified:
store/trunk/cpp/README
Log:
svn test1
Modified: store/trunk/cpp/README
===================================================================
--- store/trunk/cpp/README 2008-07-09 01:36:05 UTC (rev 2185)
+++ store/trunk/cpp/README 2008-07-09 01:53:25 UTC (rev 2186)
@@ -28,6 +28,6 @@
On a Debian-based system, install prerequisites via:
apt-get install libdb4.3++-dev
-On Debian, I need to get this file,
+On Debian, I need to get this file:
/usr/include/db_cxx.h
17 years, 9 months
rhmessaging commits: r2185 - store/trunk/cpp.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2008-07-08 21:36:05 -0400 (Tue, 08 Jul 2008)
New Revision: 2185
Modified:
store/trunk/cpp/README
Log:
minior update to test commit
Modified: store/trunk/cpp/README
===================================================================
--- store/trunk/cpp/README 2008-07-08 21:55:05 UTC (rev 2184)
+++ store/trunk/cpp/README 2008-07-09 01:36:05 UTC (rev 2185)
@@ -1,4 +1,4 @@
-== MessageStore plugin for qpid c++ broker using Berkeley DB ==
+== MessageStore plugin for qpid c++ broker ==
Pre-requisites:
- qpid broker headers and libraries (see below)
17 years, 9 months
rhmessaging commits: r2184 - in store/trunk/cpp/lib: gen/qpid/management and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-07-08 17:55:05 -0400 (Tue, 08 Jul 2008)
New Revision: 2184
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.h
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
Log:
Removed boost::shared_ptr from management-agent interface
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-08 21:55:05 UTC (rev 2184)
@@ -63,7 +63,8 @@
wcache_num_pages(JRNL_WMGR_DEF_PAGES),
highestRid(0),
isInit(false),
- envPath(envpath)
+ envPath(envpath),
+ mgmtObject(0)
{}
@@ -71,12 +72,12 @@
{
if (broker != 0)
{
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ ManagementAgent* agent = ManagementAgent::getAgent ();
- if (agent.get () != 0)
+ if (agent != 0)
{
qpid::management::PackageMrgstore packageInitializer(agent);
- mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (agent.get(), this, broker));
+ mgmtObject = new qpid::management::Store (agent, this, broker);
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
@@ -265,7 +266,7 @@
QPID_LOG(error, "Unknown error in BdbMessageStore::~BdbMessageStore()");
}
- if (mgmtObject.get() != 0)
+ if (mgmtObject != 0)
mgmtObject->resourceDestroy();
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-08 21:55:05 UTC (rev 2184)
@@ -96,7 +96,7 @@
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
- qpid::management::Store::shared_ptr mgmtObject;
+ qpid::management::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
bool mode(const bool mode, const bool force);
@@ -236,7 +236,7 @@
void commit(qpid::broker::TransactionContext& ctxt);
void abort(qpid::broker::TransactionContext& ctxt);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ qpid::management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-08 21:55:05 UTC (rev 2184)
@@ -27,7 +27,7 @@
#include "jrnl/jexception.hpp"
#include "jrnl/slock.hpp"
#include "qpid/log/Statement.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/agent/ManagementAgent.h"
#include "qpid/management/ArgsJournalExpand.h"
#include "qpid/management/ArgsJournalReconfigure.h"
#include "qpid/sys/Monitor.h"
@@ -60,7 +60,8 @@
_datap(0),
_dlen(0),
_dtok(),
- _external(false)
+ _external(false),
+ _mgmtObject(0)
{
::pthread_mutex_init(&_getf_mutex, 0);
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -72,12 +73,11 @@
journalTimerPtr->start();
journalTimerPtr->add(inactivityFireEventPtr);
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ ManagementAgent* agent = ManagementAgent::getAgent ();
- if (agent.get () != 0)
+ if (agent != 0)
{
- _mgmtObject = qpid::management::Journal::shared_ptr
- (new qpid::management::Journal(agent.get(), (qpid::management::Manageable*) this));
+ _mgmtObject = new qpid::management::Journal(agent, (qpid::management::Manageable*) this);
_mgmtObject->set_name(journalId);
_mgmtObject->set_directory(journalDirectory);
@@ -111,7 +111,7 @@
journalTimerPtr = 0;
}
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
_mgmtObject->resourceDestroy();
::pthread_mutex_destroy(&_getf_mutex);
@@ -135,7 +135,7 @@
wr_cb);
log(LOG_DEBUG, "Initialization complete");
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->set_initialFileCount(_num_jfiles);
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
@@ -205,7 +205,7 @@
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->set_initialFileCount(_num_jfiles);
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
@@ -300,7 +300,7 @@
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordEnqueues();
_mgmtObject->inc_recordDepth();
@@ -313,7 +313,7 @@
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordEnqueues();
_mgmtObject->inc_recordDepth();
@@ -326,7 +326,7 @@
{
handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordEnqueues();
_mgmtObject->inc_recordDepth();
@@ -339,7 +339,7 @@
{
handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordEnqueues();
_mgmtObject->inc_recordDepth();
@@ -351,7 +351,7 @@
{
handleIoResult(jcntl::dequeue_data_record(dtokp));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordDequeues();
_mgmtObject->dec_recordDepth();
@@ -363,7 +363,7 @@
{
handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
- if (_mgmtObject.get() != 0)
+ if (_mgmtObject != 0)
{
_mgmtObject->inc_recordDequeues();
_mgmtObject->dec_recordDepth();
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-07-08 21:55:05 UTC (rev 2184)
@@ -87,7 +87,7 @@
size_t _dlen;
journal::data_tok _dtok;
bool _external;
- qpid::management::Journal::shared_ptr _mgmtObject;
+ qpid::management::Journal* _mgmtObject;
public:
JournalImpl(const std::string& journalId,
@@ -177,7 +177,7 @@
void getEventsFire();
void flushFire();
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ qpid::management::ManagementObject* GetManagementObject (void) const
{ return _mgmtObject; }
qpid::management::Manageable::status_t ManagementMethod (uint32_t,
@@ -200,10 +200,10 @@
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
- if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs();
+ if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
}
inline void instr_decr_outstanding_aio_cnt() {
- if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs();
+ if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
}
}; // class JournalImpl
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-08 21:55:05 UTC (rev 2184)
@@ -24,7 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/agent/ManagementAgent.h"
#include "Journal.h"
#include "qpid/management/ArgsJournalExpand.h"
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-08 21:55:05 UTC (rev 2184)
@@ -118,7 +118,6 @@
public:
friend class PackageMrgstore;
- typedef boost::shared_ptr<Journal> shared_ptr;
Journal (ManagementAgent* agent,
Manageable* coreObject);
Modified: store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-08 21:55:05 UTC (rev 2184)
@@ -28,7 +28,7 @@
using namespace qpid::management;
-PackageMrgstore::PackageMrgstore (ManagementAgent::shared_ptr agent)
+PackageMrgstore::PackageMrgstore (ManagementAgent* agent)
{
agent->RegisterClass (Store::packageName, Store::className, Store::md5Sum, Store::writeSchema);
agent->RegisterClass (Journal::packageName, Journal::className, Journal::md5Sum, Journal::writeSchema);
Modified: store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.h 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.h 2008-07-08 21:55:05 UTC (rev 2184)
@@ -24,7 +24,7 @@
// This source file was created by a code generator.
// Please do not edit.
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/agent/ManagementAgent.h"
namespace qpid {
namespace management {
@@ -32,7 +32,7 @@
class PackageMrgstore
{
public:
- PackageMrgstore (ManagementAgent::shared_ptr agent);
+ PackageMrgstore (ManagementAgent* agent);
~PackageMrgstore () {}
};
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-08 21:55:05 UTC (rev 2184)
@@ -24,7 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/agent/ManagementAgent.h"
#include "Store.h"
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-08 19:04:25 UTC (rev 2183)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-08 21:55:05 UTC (rev 2184)
@@ -64,7 +64,6 @@
public:
friend class PackageMrgstore;
- typedef boost::shared_ptr<Store> shared_ptr;
Store (ManagementAgent* agent,
Manageable* coreObject, Manageable* _parent);
17 years, 9 months
rhmessaging commits: r2183 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-07-08 15:04:25 -0400 (Tue, 08 Jul 2008)
New Revision: 2183
Modified:
store/trunk/cpp/lib/StorePlugin.cpp
Log:
Revert un-necessary Plugin complications. Better solution for plugin extension points coming up...
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2008-07-08 18:21:48 UTC (rev 2182)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2008-07-08 19:04:25 UTC (rev 2183)
@@ -32,42 +32,46 @@
namespace broker {
using namespace std;
-using rhm::bdbstore::BdbMessageStore;
-struct StorePlugin : public PluginT<Broker> {
- BdbMessageStore* store;
- StorePlugin(BdbMessageStore* s) : store(s) {}
+struct StorePlugin : public Plugin {
- void initializeT(Broker& broker) {
- store->initManagement(&broker);
- }
-};
-
-struct StorePluginFactory : public Plugin::FactoryT<Broker> {
rhm::bdbstore::BdbMessageStore::Options options;
+ MessageStore *store;
Options* getOptions() { return &options; }
- boost::shared_ptr<Plugin> createT(Broker& broker) {
- std::auto_ptr<BdbMessageStore> store(new BdbMessageStore());
- if (options.storeDir.empty ()) {
- DataDir& dataDir = broker.getDataDir ();
+ void earlyInitialize (Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ store = new rhm::bdbstore::BdbMessageStore ();
+ DataDir& dataDir = broker->getDataDir ();
+
+ if (options.storeDir.empty ())
+ {
if (!dataDir.isEnabled ())
throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
"--store-directory must be present.");
+
options.storeDir = dataDir.getPath ();
}
- if (!store->init (&options)) {
+
+ if (!store->init (&options))
+ {
throw Exception("Existing journal found in different bdb/async mode. "
"Move or delete existing data files before changing modes, or use "
"'--store-force yes' to discard existing data.");
}
- BdbMessageStore* storep=store.release();
- broker.setStore(storep);
- return make_shared_ptr(new StorePlugin(storep));
+
+ broker->setStore (store);
}
+
+ void initialize(Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ ((rhm::bdbstore::BdbMessageStore*) store)->initManagement (broker);
+ }
};
-static StorePluginFactory instance; // Static initialization.
+static StorePlugin instance; // Static initialization.
}} // namespace qpid::broker
17 years, 9 months
rhmessaging commits: r2182 - in mgmt/trunk/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-07-08 14:21:48 -0400 (Tue, 08 Jul 2008)
New Revision: 2182
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/sql/schema.sql
Log:
BZ 454362: Mint broker port datatype is too small - forcing next size up to accomodate unsigned value
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-07-08 16:16:08 UTC (rev 2181)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-07-08 18:21:48 UTC (rev 2182)
@@ -40,7 +40,7 @@
name = StringCol(length=1000, default=None, unique=True, notNone=True)
host = StringCol(length=1000, default=None, notNone=True)
- port = SmallIntCol(default=None, notNone=True)
+ port = IntCol(default=None, notNone=True)
broker = ForeignKey("Broker", cascade="null", default=None)
groups = SQLRelatedJoin("BrokerGroup",
intermediateTable="broker_group_mapping",
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-07-08 16:16:08 UTC (rev 2181)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-07-08 18:21:48 UTC (rev 2182)
@@ -47,7 +47,7 @@
statsCurr = ForeignKey('BrokerStats', cascade='null', default=None)
statsPrev = ForeignKey('BrokerStats', cascade='null', default=None)
system = ForeignKey('System', cascade='null', default=None)
- port = SmallIntCol(default=None)
+ port = IntCol(default=None)
workerThreads = SmallIntCol(default=None)
maxConns = SmallIntCol(default=None)
connBacklog = SmallIntCol(default=None)
@@ -195,7 +195,7 @@
durable = BoolCol(default=None)
autoDelete = BoolCol(default=None)
exclusive = BoolCol(default=None)
- arguments = StringCol(length=4000, default=None)
+ arguments = StringCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -235,14 +235,10 @@
enqueueTxnCommits = BigIntCol(default=None)
enqueueTxnRejects = BigIntCol(default=None)
enqueueTxnCount = IntCol(default=None)
- enqueueTxnCountLow = IntCol(default=None)
- enqueueTxnCountHigh = IntCol(default=None)
dequeueTxnStarts = BigIntCol(default=None)
dequeueTxnCommits = BigIntCol(default=None)
dequeueTxnRejects = BigIntCol(default=None)
dequeueTxnCount = IntCol(default=None)
- dequeueTxnCountLow = IntCol(default=None)
- dequeueTxnCountHigh = IntCol(default=None)
consumerCount = IntCol(default=None)
consumerCountLow = IntCol(default=None)
consumerCountHigh = IntCol(default=None)
@@ -322,7 +318,7 @@
exchange = ForeignKey('Exchange', cascade='null', default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
bindingKey = StringCol(length=1000, default=None)
- arguments = StringCol(length=4000, default=None)
+ arguments = StringCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -404,7 +400,7 @@
statsPrev = ForeignKey('LinkStats', cascade='null', default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
host = StringCol(length=1000, default=None)
- port = SmallIntCol(default=None)
+ port = IntCol(default=None)
useSsl = BoolCol(default=None)
durable = BoolCol(default=None)
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-07-08 16:16:08 UTC (rev 2181)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-07-08 18:21:48 UTC (rev 2182)
@@ -35,7 +35,7 @@
self.dataTypesMap["absTime"] = self.dataTypesMap["deltaTime"] = "BigIntCol"
self.dataTypesMap["bool"] = "BoolCol"
self.dataTypesMap["sstr"] = self.dataTypesMap["lstr"] = "StringCol"
- self.dataTypesMap["ftable"] = "StringCol"
+ self.dataTypesMap["map"] = "StringCol"
def attrNameFromDbColumn(self, name, removeSuffix=""):
return self.style.dbColumnToPythonAttr(name.replace(removeSuffix, ""))
@@ -48,6 +48,10 @@
params += ", default=None"
if attribName == "id":
attribName = "id_"
+ # special case for "port" attrib, needs to be a 2-byte unsigned
+ # but uint16 converts to a signed int (SmallIntCol), so forcing the next size up (IntCol)
+ if (attribName == "port" and attribType == self.dataTypesMap["uint16"]):
+ attribType = self.dataTypesMap["uint32"]
self.pythonOutput += " %s = %s(%s)\n" % (attribName, attribType, params)
def generateTimestampAttrib(self, col):
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2008-07-08 16:16:08 UTC (rev 2181)
+++ mgmt/trunk/mint/sql/schema.sql 2008-07-08 18:21:48 UTC (rev 2182)
@@ -24,7 +24,7 @@
id SERIAL PRIMARY KEY,
name VARCHAR(1000) NOT NULL UNIQUE,
host VARCHAR(1000) NOT NULL,
- port SMALLINT NOT NULL,
+ port INT NOT NULL,
broker_id INT,
cluster_id INT,
profile_id INT
@@ -85,7 +85,7 @@
exchange_id INT,
queue_id INT,
binding_key VARCHAR(1000),
- arguments VARCHAR(4000)
+ arguments TEXT
);
CREATE TABLE binding_stats (
@@ -134,7 +134,7 @@
stats_curr_id INT,
stats_prev_id INT,
system_id INT,
- port SMALLINT,
+ port INT,
worker_threads SMALLINT,
max_conns SMALLINT,
conn_backlog SMALLINT,
@@ -279,7 +279,7 @@
stats_prev_id INT,
vhost_id INT,
host VARCHAR(1000),
- port SMALLINT,
+ port INT,
use_ssl BOOL,
durable BOOL
);
@@ -307,7 +307,7 @@
durable BOOL,
auto_delete BOOL,
exclusive BOOL,
- arguments VARCHAR(4000)
+ arguments TEXT
);
CREATE TABLE queue_stats (
@@ -333,14 +333,10 @@
enqueue_txn_commits BIGINT,
enqueue_txn_rejects BIGINT,
enqueue_txn_count INT,
- enqueue_txn_count_low INT,
- enqueue_txn_count_high INT,
dequeue_txn_starts BIGINT,
dequeue_txn_commits BIGINT,
dequeue_txn_rejects BIGINT,
dequeue_txn_count INT,
- dequeue_txn_count_low INT,
- dequeue_txn_count_high INT,
consumer_count INT,
consumer_count_low INT,
consumer_count_high INT,
17 years, 9 months