[rhmessaging-commits] rhmessaging commits: r4053 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jun 28 14:18:31 EDT 2010


Author: aconway
Date: 2010-06-28 14:18:31 -0400 (Mon, 28 Jun 2010)
New Revision: 4053

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/StorePlugin.cpp
   store/trunk/cpp/tests/OrderingTest.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Bug 607748 - Crash on exit in store cluster tests.

This is an order-of-static-destructors problem.

This is an order-of-static-destructors problem. Fixed by having the
store use the broker's Timer. This ensures orderly shut down as the
brokers destructor will destroy the store first and then the timer.


Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -33,6 +33,7 @@
 #include "qmf/com/redhat/rhm/store/EventFull.h"
 #include "qmf/com/redhat/rhm/store/EventRecovered.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
 #include "StoreException.h"
 
 using namespace mrg::msgstore;
@@ -40,15 +41,12 @@
 using qpid::management::ManagementAgent;
 namespace _qmf = qmf::com::redhat::rhm::store;
 
-qpid::sys::Mutex JournalImpl::_static_lock;
-qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
-u_int32_t JournalImpl::cnt = 0;
-
 void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
 
 void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
 
-JournalImpl::JournalImpl(const std::string& journalId,
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+                         const std::string& journalId,
                          const std::string& journalDirectory,
                          const std::string& journalBaseFilename,
                          const qpid::sys::Duration getEventsTimeout,
@@ -56,6 +54,7 @@
                          qpid::management::ManagementAgent* a,
                          DeleteCallback onDelete):
                          jcntl(journalId, journalDirectory, journalBaseFilename),
+                         timer(timer_),
                          getEventsTimerSetFlag(false),
                          lastReadRid(0),
                          writeActivityFlag(false),
@@ -72,13 +71,8 @@
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
     {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        if (journalTimerPtr == 0)
-            journalTimerPtr = new qpid::sys::Timer;
-        assert (journalTimerPtr != 0);
-        cnt++;
-        journalTimerPtr->start();
-        journalTimerPtr->add(inactivityFireEventPtr);
+        timer.start();
+        timer.add(inactivityFireEventPtr);
     }
 
     if (_agent != 0)
@@ -119,15 +113,6 @@
     inactivityFireEventPtr->cancel();
     free_read_buffers();
 
-    {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        if (journalTimerPtr && --cnt == 0)
-        {
-            delete journalTimerPtr;
-            journalTimerPtr = 0;
-        }
-    }
-
     if (_mgmtObject != 0) {
         _mgmtObject->resourceDestroy();
         _mgmtObject = 0;
@@ -557,9 +542,7 @@
     }
     inactivityFireEventPtr->setupNextFire();
     {
-        qpid::sys::Mutex::ScopedLock sl(_static_lock);
-        assert(journalTimerPtr != 0);
-        journalTimerPtr->add(inactivityFireEventPtr);
+        timer.add(inactivityFireEventPtr);
     }
 }
 

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/JournalImpl.h	2010-06-28 18:18:31 UTC (rev 4053)
@@ -37,6 +37,10 @@
 #include "qpid/management/Manageable.h"
 #include "qmf/com/redhat/rhm/store/Journal.h"
 
+namespace qpid { namespace sys {
+class Timer;
+}}
+
 namespace mrg {
 namespace msgstore {
 
@@ -75,9 +79,9 @@
     
   private:
     static qpid::sys::Mutex _static_lock;
-    static qpid::sys::Timer* journalTimerPtr;
     static u_int32_t cnt;
 
+    qpid::sys::Timer& timer;
     bool getEventsTimerSetFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
     qpid::sys::Mutex _getf_lock;
@@ -102,7 +106,8 @@
     
   public:
     
-    JournalImpl(const std::string& journalId,
+    JournalImpl(qpid::sys::Timer& timer,
+                const std::string& journalId,
                 const std::string& journalDirectory,
                 const std::string& journalBaseFilename,
                 const qpid::sys::Duration getEventsTimeout,
@@ -219,9 +224,8 @@
 
     inline void setGetEventTimer()
     {
-        assert(journalTimerPtr != 0);
         getEventsFireEventsPtr->setupNextFire();
-        journalTimerPtr->add(getEventsFireEventsPtr);
+        timer.add(getEventsFireEventsPtr);
         getEventsTimerSetFlag = true;
     }
     void handleIoResult(const mrg::journal::iores r);
@@ -239,13 +243,14 @@
 class TplJournalImpl : public JournalImpl
 {
   public:
-    TplJournalImpl(const std::string& journalId,
+    TplJournalImpl(qpid::sys::Timer& timer,
+                   const std::string& journalId,
                    const std::string& journalDirectory,
                    const std::string& journalBaseFilename,
                    const qpid::sys::Duration getEventsTimeout,
                    const qpid::sys::Duration flushTimeout,
                    qpid::management::ManagementAgent* agent) :
-        JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+        JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
     {}
 
     ~TplJournalImpl() {}

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -62,7 +62,7 @@
                                                               tpc_flag(_tpc_flag)
 {}
 
-MessageStoreImpl::MessageStoreImpl(const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
                                  numJrnlFiles(0),
                                  autoJrnlExpand(false),
                                  autoJrnlExpandMaxFiles(0),
@@ -77,6 +77,7 @@
                                  highestRid(0),
                                  isInit(false),
                                  envPath(envpath),
+                                 timer(timer_),
                                  mgmtObject(0),
                                  agent(0)
 {}
@@ -339,7 +340,7 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+        tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
         txn.commit();
     } catch (const journal::jexception& e) {
         QPID_LOG(error, "Journal Exception occurred while initializing store: " << e);
@@ -479,7 +480,7 @@
         return;
     }
 
-    jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
+    jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
                              defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                              boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
@@ -763,7 +764,7 @@
             QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+        jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"),
                                  defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                                  boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2010-06-28 18:18:31 UTC (rev 4053)
@@ -45,6 +45,10 @@
 #define DB_BUFFER_SMALL ENOMEM
 #endif
 
+namespace qpid { namespace sys {
+class Timer;
+}}
+
 namespace mrg {
 namespace msgstore {
 
@@ -147,6 +151,7 @@
     u_int64_t highestRid;
     bool isInit;
     const char* envPath;
+    qpid::sys::Timer& timer;
 
     qmf::com::redhat::rhm::store::Store* mgmtObject;
     qpid::management::ManagementAgent* agent;
@@ -266,7 +271,7 @@
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
 
-    MessageStoreImpl(const char* envpath = 0);
+    MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
 
     virtual ~MessageStoreImpl();
 

Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/lib/StorePlugin.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -43,7 +43,7 @@
     {
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
+        boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl (broker->getTimer()));
         DataDir& dataDir = broker->getDataDir ();
         if (options.storeDir.empty ())
         {

Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/OrderingTest.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -30,7 +30,10 @@
 #include <qpid/broker/RecoveryManagerImpl.h>
 #include <qpid/framing/AMQHeaderBody.h>
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
 
+qpid::sys::Timer timer;
+
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
     opts.selectors.clear(); \
@@ -59,7 +62,7 @@
 
 void setup()
 {
-    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
     store->init(test_dir, 4, 1, true); // truncate store
 
     queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -98,7 +101,7 @@
     queue.reset();
     store.reset();
 
-    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
     store->init(test_dir, 4, 1);
     ExchangeRegistry exchanges;
     LinkRegistry links;

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
 #include <qpid/framing/AMQHeaderBody.h>
 #include <qpid/framing/FieldTable.h>
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
 
+qpid::sys::Timer timer;
+
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
     opts.selectors.clear(); \
@@ -92,7 +95,7 @@
                    const string& key, const FieldTable& args)
 {
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -102,7 +105,7 @@
         store.bind(*exchange, *queue, key, args);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -121,7 +124,7 @@
         store.unbind(*exchange, *queue, key, args);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -148,7 +151,7 @@
     SET_LOG_LEVEL("error+"); // This only needs to be set once.
 
     cout << test_filename << ".CreateDelete: " << flush;
-    MessageStoreImpl store;
+    MessageStoreImpl store(timer);
     store.init(test_dir, 4, 1, true); // truncate store
     string name("CreateDeleteQueue");
     Queue queue(name, 0, &store, 0);
@@ -164,7 +167,7 @@
 QPID_AUTO_TEST_CASE(EmptyRecover)
 {
     cout << test_filename << ".EmptyRecover: " << flush;
-    MessageStoreImpl store;
+    MessageStoreImpl store(timer);
     store.init(test_dir, 4, 1, true); // truncate store
     QueueRegistry registry;
     registry.setStore (&store);
@@ -181,7 +184,7 @@
     uint64_t id(0);
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         store.create(queue, qpid::framing::FieldTable());
@@ -189,7 +192,7 @@
         id = queue.getPersistenceId();
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -209,7 +212,7 @@
     std::auto_ptr<QueuePolicy> policy( QueuePolicy::createQueuePolicy(101, 202));
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         FieldTable settings;
@@ -218,7 +221,7 @@
         BOOST_REQUIRE(queue.getPersistenceId());
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -239,14 +242,14 @@
 
     string name("MyDurableQueue");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue queue(name, 0, &store, 0);
         store.create(queue, qpid::framing::FieldTable());
         store.destroy(queue);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -272,7 +275,7 @@
     string data1("abcdefg");
     string data2("hijklmn");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
         FieldTable settings;
@@ -290,7 +293,7 @@
         queue->enqueue(0, msg);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -331,7 +334,7 @@
         string routingKey("MyRoutingKey");
         Uuid messageId(true);
         string data("abcdefg");
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
         FieldTable settings;
@@ -347,7 +350,7 @@
         queue->dequeue(0, qm);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         QueueRegistry registry;
         registry.setStore (&store);
@@ -370,7 +373,7 @@
     FieldTable args;
     args.setString("a", "A");
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         ExchangeRegistry registry;
         Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -379,7 +382,7 @@
         BOOST_REQUIRE(id);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry registry;
 
@@ -393,7 +396,7 @@
         store.destroy(*exchange);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry registry;
 
@@ -441,7 +444,7 @@
     string key("my-routing-key");
     FieldTable args;
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1, true); // truncate store
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -455,7 +458,7 @@
         store.destroy(*queue1);
     }//db will be closed
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;
@@ -472,7 +475,7 @@
         store.destroy(*exchange);
     }
     {
-        MessageStoreImpl store;
+        MessageStoreImpl store(timer);
         store.init(test_dir, 4, 1);
         ExchangeRegistry exchanges;
         QueueRegistry queues;

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
 
+qpid::sys::Timer timer;
+
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
     opts.selectors.clear(); \
@@ -69,7 +72,7 @@
 class TestMessageStore: public MessageStoreImpl
 {
   public:
-    TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+    TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
     std::auto_ptr<qpid::broker::TransactionContext> begin() {
         checkInit();
         // pass sequence number for c/a
@@ -109,7 +112,7 @@
 template <class T>
 void setup()
 {
-    store = std::auto_ptr<T>(new T());
+    store = std::auto_ptr<T>(new T(timer));
     store->init(test_dir, 4, 1, true); // truncate store
 
     //create two queues:
@@ -128,7 +131,7 @@
     queues.reset();
     store.reset();
 
-    store = std::auto_ptr<T>(new T());
+    store = std::auto_ptr<T>(new T(timer));
     store->init(test_dir, 4, 1);
     queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
     ExchangeRegistry exchanges;

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2010-06-25 15:13:09 UTC (rev 4052)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2010-06-28 18:18:31 UTC (rev 4053)
@@ -32,7 +32,10 @@
 #include "qpid/log/Statement.h"
 #include "TxnCtxt.h"
 #include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
 
+qpid::sys::Timer timer;
+
 #define SET_LOG_LEVEL(level) \
     qpid::log::Options opts(""); \
     opts.selectors.clear(); \
@@ -182,7 +185,7 @@
     class TestMessageStore: public MessageStoreImpl
     {
       public:
-        TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+        TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
         std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
             checkInit();
             IdSequence* jtx = &messageIdSequence;
@@ -325,7 +328,7 @@
     template <class T>
     void setup()
     {
-        store = std::auto_ptr<T>(new T());
+        store = std::auto_ptr<T>(new T(timer));
         store->init(test_dir, 4, 1, true); // truncate store
 
         //create two queues:
@@ -353,7 +356,7 @@
         queues.reset();
         links.reset();
 
-        store = std::auto_ptr<T>(new T());
+        store = std::auto_ptr<T>(new T(timer));
         store->init(test_dir, 4, 1);
         sys::Timer t;
         ExchangeRegistry exchanges;



More information about the rhmessaging-commits mailing list