Author: aconway
Date: 2010-06-28 14:18:31 -0400 (Mon, 28 Jun 2010)
New Revision: 4053
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/StorePlugin.cpp
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Bug 607748 - Crash on exit in store cluster tests.
This is an order-of-static-destructors problem.
This is an order-of-static-destructors problem. Fixed by having the
store use the broker's Timer. This ensures orderly shut down as the
brokers destructor will destroy the store first and then the timer.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -33,6 +33,7 @@
#include "qmf/com/redhat/rhm/store/EventFull.h"
#include "qmf/com/redhat/rhm/store/EventRecovered.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
#include "StoreException.h"
using namespace mrg::msgstore;
@@ -40,15 +41,12 @@
using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
-qpid::sys::Mutex JournalImpl::_static_lock;
-qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
-u_int32_t JournalImpl::cnt = 0;
-
void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if
(_parent) _parent->flushFire(); }
void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if
(_parent) _parent->getEventsFire(); }
-JournalImpl::JournalImpl(const std::string& journalId,
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
@@ -56,6 +54,7 @@
qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalBaseFilename),
+ timer(timer_),
getEventsTimerSetFlag(false),
lastReadRid(0),
writeActivityFlag(false),
@@ -72,13 +71,8 @@
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
{
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- if (journalTimerPtr == 0)
- journalTimerPtr = new qpid::sys::Timer;
- assert (journalTimerPtr != 0);
- cnt++;
- journalTimerPtr->start();
- journalTimerPtr->add(inactivityFireEventPtr);
+ timer.start();
+ timer.add(inactivityFireEventPtr);
}
if (_agent != 0)
@@ -119,15 +113,6 @@
inactivityFireEventPtr->cancel();
free_read_buffers();
- {
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- if (journalTimerPtr && --cnt == 0)
- {
- delete journalTimerPtr;
- journalTimerPtr = 0;
- }
- }
-
if (_mgmtObject != 0) {
_mgmtObject->resourceDestroy();
_mgmtObject = 0;
@@ -557,9 +542,7 @@
}
inactivityFireEventPtr->setupNextFire();
{
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- assert(journalTimerPtr != 0);
- journalTimerPtr->add(inactivityFireEventPtr);
+ timer.add(inactivityFireEventPtr);
}
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-06-28 18:18:31 UTC (rev 4053)
@@ -37,6 +37,10 @@
#include "qpid/management/Manageable.h"
#include "qmf/com/redhat/rhm/store/Journal.h"
+namespace qpid { namespace sys {
+class Timer;
+}}
+
namespace mrg {
namespace msgstore {
@@ -75,9 +79,9 @@
private:
static qpid::sys::Mutex _static_lock;
- static qpid::sys::Timer* journalTimerPtr;
static u_int32_t cnt;
+ qpid::sys::Timer& timer;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
@@ -102,7 +106,8 @@
public:
- JournalImpl(const std::string& journalId,
+ JournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
@@ -219,9 +224,8 @@
inline void setGetEventTimer()
{
- assert(journalTimerPtr != 0);
getEventsFireEventsPtr->setupNextFire();
- journalTimerPtr->add(getEventsFireEventsPtr);
+ timer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
void handleIoResult(const mrg::journal::iores r);
@@ -239,13 +243,14 @@
class TplJournalImpl : public JournalImpl
{
public:
- TplJournalImpl(const std::string& journalId,
+ TplJournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent) :
- JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout,
flushTimeout, agent)
+ JournalImpl(timer, journalId, journalDirectory, journalBaseFilename,
getEventsTimeout, flushTimeout, agent)
{}
~TplJournalImpl() {}
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -62,7 +62,7 @@
tpc_flag(_tpc_flag)
{}
-MessageStoreImpl::MessageStoreImpl(const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -77,6 +77,7 @@
highestRid(0),
isInit(false),
envPath(envpath),
+ timer(timer_),
mgmtObject(0),
agent(0)
{}
@@ -339,7 +340,7 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(),
"tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+ tplStorePtr.reset(new TplJournalImpl(timer, "TplStore",
getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout,
agent));
txn.commit();
} catch (const journal::jexception& e) {
QPID_LOG(error, "Journal Exception occurred while initializing store: "
<< e);
@@ -479,7 +480,7 @@
return;
}
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
std::string("JournalData"),
+ jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue),
std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this,
_1));
{
@@ -763,7 +764,7 @@
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and
attempting to continue.");
break;
}
- jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName),
std::string("JournalData"),
+ jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName),
std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout,
agent,
boost::bind(&MessageStoreImpl::journalDeleted, this,
_1));
{
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2010-06-28 18:18:31 UTC (rev 4053)
@@ -45,6 +45,10 @@
#define DB_BUFFER_SMALL ENOMEM
#endif
+namespace qpid { namespace sys {
+class Timer;
+}}
+
namespace mrg {
namespace msgstore {
@@ -147,6 +151,7 @@
u_int64_t highestRid;
bool isInit;
const char* envPath;
+ qpid::sys::Timer& timer;
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -266,7 +271,7 @@
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
- MessageStoreImpl(const char* envpath = 0);
+ MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
virtual ~MessageStoreImpl();
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -43,7 +43,7 @@
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- boost::shared_ptr<qpid::broker::MessageStore> store(new
mrg::msgstore::MessageStoreImpl ());
+ boost::shared_ptr<qpid::broker::MessageStore> store(new
mrg::msgstore::MessageStoreImpl (broker->getTimer()));
DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -30,7 +30,10 @@
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+qpid::sys::Timer timer;
+
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
@@ -59,7 +62,7 @@
void setup()
{
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
store->init(test_dir, 4, 1, true); // truncate store
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -98,7 +101,7 @@
queue.reset();
store.reset();
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
store->init(test_dir, 4, 1);
ExchangeRegistry exchanges;
LinkRegistry links;
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/FieldTable.h>
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+qpid::sys::Timer timer;
+
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
@@ -92,7 +95,7 @@
const string& key, const FieldTable& args)
{
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -102,7 +105,7 @@
store.bind(*exchange, *queue, key, args);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -121,7 +124,7 @@
store.unbind(*exchange, *queue, key, args);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -148,7 +151,7 @@
SET_LOG_LEVEL("error+"); // This only needs to be set once.
cout << test_filename << ".CreateDelete: " << flush;
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -164,7 +167,7 @@
QPID_AUTO_TEST_CASE(EmptyRecover)
{
cout << test_filename << ".EmptyRecover: " << flush;
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
QueueRegistry registry;
registry.setStore (&store);
@@ -181,7 +184,7 @@
uint64_t id(0);
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -189,7 +192,7 @@
id = queue.getPersistenceId();
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -209,7 +212,7 @@
std::auto_ptr<QueuePolicy> policy( QueuePolicy::createQueuePolicy(101, 202));
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -218,7 +221,7 @@
BOOST_REQUIRE(queue.getPersistenceId());
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -239,14 +242,14 @@
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -272,7 +275,7 @@
string data1("abcdefg");
string data2("hijklmn");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -290,7 +293,7 @@
queue->enqueue(0, msg);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -331,7 +334,7 @@
string routingKey("MyRoutingKey");
Uuid messageId(true);
string data("abcdefg");
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -347,7 +350,7 @@
queue->dequeue(0, qm);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -370,7 +373,7 @@
FieldTable args;
args.setString("a", "A");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -379,7 +382,7 @@
BOOST_REQUIRE(id);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -393,7 +396,7 @@
store.destroy(*exchange);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -441,7 +444,7 @@
string key("my-routing-key");
FieldTable args;
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -455,7 +458,7 @@
store.destroy(*queue1);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -472,7 +475,7 @@
store.destroy(*exchange);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+qpid::sys::Timer timer;
+
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
@@ -69,7 +72,7 @@
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+ TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) :
MessageStoreImpl(timer, envpath) {}
std::auto_ptr<qpid::broker::TransactionContext> begin() {
checkInit();
// pass sequence number for c/a
@@ -109,7 +112,7 @@
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -128,7 +131,7 @@
queues.reset();
store.reset();
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1);
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
#include "qpid/log/Statement.h"
#include "TxnCtxt.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+qpid::sys::Timer timer;
+
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
opts.selectors.clear(); \
@@ -182,7 +185,7 @@
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+ TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) :
MessageStoreImpl(timer, envpath) {}
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const
std::string& xid) {
checkInit();
IdSequence* jtx = &messageIdSequence;
@@ -325,7 +328,7 @@
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -353,7 +356,7 @@
queues.reset();
links.reset();
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1);
sys::Timer t;
ExchangeRegistry exchanges;