Author: kpvdr
Date: 2008-03-28 16:39:30 -0400 (Fri, 28 Mar 2008)
New Revision: 1811
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
Log:
Bugfix for BZ439499 - C++ broker fails JMS TCK when started in daemon mode. Changed static
Timer instance into static timer pointer instance, and initialized the timer if required
during JournalImpl construction. Also changed remaining messages sent to cout in
BdbMessageStore to use QPID_LOG instead.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-28 19:47:15 UTC (rev 1810)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-28 20:39:30 UTC (rev 1811)
@@ -26,6 +26,7 @@
#include <qpid/broker/Message.h>
#include <qpid/framing/Buffer.h>
#include <qpid/log/Statement.h>
+#include <qpid/sys/Mutex.h>
#include <algorithm>
#include <iomanip>
#include <sstream>
@@ -127,12 +128,12 @@
if (numJrnlFiles < JRNL_MIN_NUM_FILES)
{
numJrnlFiles = JRNL_MIN_NUM_FILES;
- std::cout << "WARNING: parameter num-jfiles (" <<
opts->numJrnlFiles << ") below allowable minimum (" <<
numJrnlFiles << "); changing this parameter to minimum value." <<
std::endl;
+ QPID_LOG(warning, "parameter num-jfiles (" <<
opts->numJrnlFiles << ") below allowable minimum (" <<
numJrnlFiles << "); changing this parameter to minimum value.");
}
else if (numJrnlFiles > 64)
{
numJrnlFiles = 64;
- std::cout << "WARNING: parameter num-jfiles (" <<
opts->numJrnlFiles << ") above allowable maximum (" <<
numJrnlFiles << "); changing this parameter to maximum value." <<
std::endl;
+ QPID_LOG(warning, "parameter num-jfiles (" <<
opts->numJrnlFiles << ") above allowable maximum (" <<
numJrnlFiles << "); changing this parameter to maximum value.");
}
u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
@@ -140,12 +141,12 @@
if (jrnlFsizePgs < jrnlMinFsizePgs)
{
jrnlFsizePgs = jrnlMinFsizePgs;
- std::cout << "WARNING: parameter jfile-size-pgs (" <<
opts->jrnlFsizePgs << ") below allowable minimum (" <<
jrnlFsizePgs << "); changing this parameter to minimum value." <<
std::endl;
+ QPID_LOG(warning, "parameter jfile-size-pgs (" <<
opts->jrnlFsizePgs << ") below allowable minimum (" <<
jrnlFsizePgs << "); changing this parameter to minimum value.");
}
else if (jrnlFsizePgs > 1024) // (pgs) = 64MiB max file size
{
jrnlFsizePgs = 1024;
- std::cout << "WARNING: parameter jfile-size-pgs (" <<
opts->jrnlFsizePgs << ") above allowable maximum (" <<
jrnlFsizePgs << "); changing this parameter to maximum value." <<
std::endl;
+ QPID_LOG(warning, "parameter jfile-size-pgs (" <<
opts->jrnlFsizePgs << ") above allowable maximum (" <<
jrnlFsizePgs << "); changing this parameter to maximum value.");
}
return init(opts->storeDir, opts->storeAsync, opts->storeForce,
numJrnlFiles, jrnlFsizePgs);
@@ -246,7 +247,11 @@
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
- JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = 0;
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
defJournalGetEventsTimeout, defJournalFlushTimeout);
+ }
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
@@ -414,7 +419,11 @@
if (usingJrnl())
{
const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = 0;
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
defJournalGetEventsTimeout, defJournalFlushTimeout);
+ }
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try
@@ -474,7 +483,7 @@
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
if (buffer.available() < 8) {
- std::cout << "Not enough data for binding: " <<
buffer.available() << std::endl;
+ QPID_LOG(error, "Not enough data for binding: " <<
buffer.available());
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
@@ -1209,7 +1218,7 @@
txn.complete(commit);
} catch (const std::exception& e) {
- std::cout << "Error completing xid " << txn.getXid()
<< ": " << e.what() << std::endl;
+ QPID_LOG(error, "Error completing xid " << txn.getXid() <<
": " << e.what());
txn.abort();
throw;
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-03-28 19:47:15 UTC (rev 1810)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-03-28 20:39:30 UTC (rev 1811)
@@ -87,6 +87,7 @@
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
+ qpid::sys::Mutex jrnlCreateLock;
bool mode(const bool mode, const bool force);
void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-03-28 19:47:15 UTC (rev 1810)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-03-28 20:39:30 UTC (rev 1811)
@@ -32,7 +32,7 @@
using namespace rhm::bdbstore;
using namespace rhm::journal;
-qpid::broker::Timer JournalImpl::journalTimer;
+qpid::broker::Timer* JournalImpl::journalTimerPtr = 0;
void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
void GetEventsFireEvent::fire() {
@@ -60,8 +60,11 @@
::pthread_mutex_init(&_getf_mutex, 0);
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
- journalTimer.start();
- journalTimer.add(inactivityFireEventPtr);
+ if (journalTimerPtr == 0)
+ journalTimerPtr = new qpid::broker::Timer;
+ assert (journalTimerPtr != 0);
+ journalTimerPtr->start();
+ journalTimerPtr->add(inactivityFireEventPtr);
}
JournalImpl::~JournalImpl()
@@ -269,7 +272,8 @@
}
}
inactivityFireEventPtr->reset();
- journalTimer.add(inactivityFireEventPtr);
+ assert(journalTimerPtr != 0);
+ journalTimerPtr->add(inactivityFireEventPtr);
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-03-28 19:47:15 UTC (rev 1810)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-03-28 20:39:30 UTC (rev 1811)
@@ -66,7 +66,7 @@
class JournalImpl : public qpid::broker::ExternalQueueStore, public
journal::jcntl
{
private:
- static qpid::broker::Timer journalTimer;
+ static qpid::broker::Timer* journalTimerPtr;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
@@ -146,7 +146,8 @@
inline void setGetEventTimer()
{
getEventsFireEventsPtr->addRef();
- journalTimer.add(getEventsFireEventsPtr);
+ assert(journalTimerPtr != 0);
+ journalTimerPtr->add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
void handleIoResult(const journal::iores r);