Author: kpvdr
Date: 2012-12-05 11:32:41 -0500 (Wed, 05 Dec 2012)
New Revision: 4522
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/StorePlugin.cpp
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
QPID-4490 / BZ873674 - Durable mesages with TTL do not expire after broker recovery (c++
store)
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -59,7 +59,7 @@
tpc_flag(_tpc_flag)
{}
-MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) :
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -74,7 +74,7 @@
highestRid(0),
isInit(false),
envPath(envpath),
- timer(timer_),
+ broker(broker_),
agent(0)
{}
@@ -219,7 +219,7 @@
autoJrnlExpandMaxFiles = p;
}
-void MessageStoreImpl::initManagement (qpid::broker::Broker* broker)
+void MessageStoreImpl::initManagement ()
{
if (broker != 0) {
agent = broker->getManagementAgent();
@@ -364,7 +364,7 @@
// NOTE: during normal initialization, agent == 0 because the store is
initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We
always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See
https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(timer, "TplStore",
getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout,
0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(),
"TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout,
defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -515,7 +515,7 @@
return;
}
- jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue),
std::string("JournalData"),
+ jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue),
std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this,
_1));
{
@@ -801,7 +801,7 @@
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and
attempting to continue.");
break;
}
- jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName),
std::string("JournalData"),
+ jQueue = new JournalImpl(broker->getTimer(), queueName,
getJrnlHashDir(queueName), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout,
agent,
boost::bind(&MessageStoreImpl::journalDeleted, this,
_1));
{
@@ -981,6 +981,8 @@
// At some future point if delivery attempts are stored, then this call
would
// become optional depending on that information.
msg->setRedelivered();
+ // Reset the TTL for the recovered message
+ msg->computeExpiration(broker->getExpiryPolicy());
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2012-12-05 16:32:41 UTC (rev 4522)
@@ -152,7 +152,7 @@
u_int64_t highestRid;
bool isInit;
const char* envPath;
- qpid::sys::Timer& timer;
+ qpid::broker::Broker* broker;
qmf::com::redhat::rhm::store::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -275,7 +275,7 @@
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
- MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
+ MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);
virtual ~MessageStoreImpl();
@@ -294,7 +294,7 @@
void truncateInit(const bool saveStoreContent = false);
- void initManagement (qpid::broker::Broker* broker);
+ void initManagement ();
void finalize();
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -46,7 +46,7 @@
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- store.reset(new MessageStoreImpl(broker->getTimer()));
+ store.reset(new MessageStoreImpl(broker));
DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
@@ -71,7 +71,7 @@
QPID_LOG(info, "Disabling management instrumentation for the store in a
cluster.");
} else {
QPID_LOG(info, "Enabling management instrumentation for the
store.");
- store->initManagement(broker);
+ store->initManagement();
}
}
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -32,8 +32,8 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
-qpid::sys::Timer timer;
-qpid::broker::ProtocolRegistry pr;
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -84,7 +84,7 @@
void setup()
{
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
store->init(test_dir, 4, 1, true); // truncate store
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -112,14 +112,14 @@
queue.reset();
store.reset();
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
store->init(test_dir, 4, 1);
ExchangeRegistry exchanges;
LinkRegistry links;
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, pr);
+ RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr,
br.getProtocolRegistry());
store->recover(recoveryMgr);
queue = queues.find(name);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -36,8 +36,8 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
-qpid::sys::Timer timer;
-qpid::broker::ProtocolRegistry pr;
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -76,7 +76,7 @@
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (&store);
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, pr);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr,
br.getProtocolRegistry());
store.recover(recovery);
}
@@ -98,7 +98,7 @@
const string& key, const FieldTable& args)
{
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -108,7 +108,7 @@
store.bind(*exchange, *queue, key, args);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -127,7 +127,7 @@
store.unbind(*exchange, *queue, key, args);
}
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -154,7 +154,7 @@
SET_LOG_LEVEL("error+"); // This only needs to be set once.
cout << test_filename << ".CreateDelete: " << flush;
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -170,7 +170,7 @@
QPID_AUTO_TEST_CASE(EmptyRecover)
{
cout << test_filename << ".EmptyRecover: " << flush;
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
QueueRegistry registry;
registry.setStore (&store);
@@ -187,7 +187,7 @@
uint64_t id(0);
string name("MyDurableQueue");
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -195,7 +195,7 @@
id = queue.getPersistenceId();
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -219,14 +219,14 @@
settings.populate(arguments, settings.storeSettings);
string name("MyDurableQueue");
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, settings, &store, 0);
queue.create();
BOOST_REQUIRE(queue.getPersistenceId());
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -248,14 +248,14 @@
string name("MyDurableQueue");
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -281,7 +281,7 @@
string data1("abcdefg");
string data2("hijklmn");
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
queue->create();
@@ -295,7 +295,7 @@
queue->deliver(msg);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -333,7 +333,7 @@
string routingKey("MyRoutingKey");
Uuid messageId(true);
string data("abcdefg");
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
queue->create();
@@ -348,7 +348,7 @@
queue->dequeue(0, cursor);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -371,7 +371,7 @@
FieldTable args;
args.setString("a", "A");
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -380,7 +380,7 @@
BOOST_REQUIRE(id);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -394,7 +394,7 @@
store.destroy(*exchange);
}
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -442,7 +442,7 @@
string key("my-routing-key");
FieldTable args;
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -456,7 +456,7 @@
store.destroy(*queue1);
}//db will be closed
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -473,7 +473,7 @@
store.destroy(*exchange);
}
{
- MessageStoreImpl store(timer);
+ MessageStoreImpl store(&br);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -34,8 +34,8 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
-qpid::sys::Timer timer;
-qpid::broker::ProtocolRegistry pr;
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -73,7 +73,7 @@
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) :
MessageStoreImpl(timer, envpath) {}
+ TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) :
MessageStoreImpl(br, envpath) {}
std::auto_ptr<qpid::broker::TransactionContext> begin() {
checkInit();
// pass sequence number for c/a
@@ -113,7 +113,7 @@
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T(timer));
+ store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -131,7 +131,7 @@
queues.reset();
store.reset();
- store = std::auto_ptr<T>(new T(timer));
+ store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1);
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
@@ -139,7 +139,7 @@
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, pr);
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr,
br.getProtocolRegistry());
store->recover(recovery);
queueA = queues->find(nameA);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2012-11-30 19:18:38 UTC (rev 4521)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2012-12-05 16:32:41 UTC (rev 4522)
@@ -34,8 +34,8 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Timer.h"
-qpid::sys::Timer timer;
-qpid::broker::ProtocolRegistry pr;
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -186,7 +186,7 @@
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) :
MessageStoreImpl(timer, envpath) {}
+ TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) :
MessageStoreImpl(br, envpath) {}
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const
std::string& xid) {
checkInit();
IdSequence* jtx = &messageIdSequence;
@@ -352,7 +352,7 @@
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T(timer));
+ store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -377,7 +377,7 @@
queues.reset();
links.reset();
- store = std::auto_ptr<T>(new T(timer));
+ store = std::auto_ptr<T>(new T(&br));
store->init(test_dir, 4, 1);
sys::Timer t;
ExchangeRegistry exchanges;
@@ -385,7 +385,7 @@
links = std::auto_ptr<LinkRegistry>(new LinkRegistry);
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t));
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, pr);
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr,
br.getProtocolRegistry());
store->recover(recovery);
queueA = queues->find(nameA);