rhmessaging commits: r2172 - in store/trunk/cpp/lib: gen/qpid/management and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-06-30 15:03:29 -0400 (Mon, 30 Jun 2008)
New Revision: 2172
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
Log:
Updates for management agent API changes
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 19:03:29 UTC (rev 2172)
@@ -84,7 +84,7 @@
if (agent.get () != 0)
{
qpid::management::PackageMrgstore packageInitializer(agent);
- mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (this, broker));
+ mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (agent.get(), this, broker));
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-06-30 19:03:29 UTC (rev 2172)
@@ -77,7 +77,7 @@
if (agent.get () != 0)
{
_mgmtObject = qpid::management::Journal::shared_ptr
- (new qpid::management::Journal((qpid::management::Manageable*) this));
+ (new qpid::management::Journal(agent.get(), (qpid::management::Manageable*) this));
_mgmtObject->set_name(journalId);
_mgmtObject->set_directory(journalDirectory);
@@ -202,7 +202,7 @@
_mgmtObject->set_initialFileCount(_num_jfiles);
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_currentFileCount(_num_jfiles);
- _mgmtObject->set_recordDepth(_emap.size());
+ _mgmtObject->inc_recordDepth(_emap.size());
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-06-30 19:03:29 UTC (rev 2172)
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementAgent.h"
#include "Journal.h"
#include "qpid/management/ArgsJournalExpand.h"
@@ -38,9 +39,8 @@
uint8_t Journal::md5Sum[16] =
{0x0,0xdf,0x9a,0xf7,0x4,0x98,0x29,0x54,0xde,0x42,0xc5,0xf5,0xf5,0x13,0xab,0xa5};
-Journal::Journal (Manageable* _core) :
- ManagementObject(_core)
-
+Journal::Journal (ManagementAgent* _agent, Manageable* _core) :
+ ManagementObject(_agent, _core)
{
initialFileCount = 0;
@@ -49,8 +49,6 @@
recordDepth = 0;
recordDepthHigh = 0;
recordDepthLow = 0;
- recordEnqueues = 0;
- recordDequeues = 0;
outstandingAIOs = 0;
outstandingAIOsHigh = 0;
outstandingAIOsLow = 0;
@@ -60,10 +58,6 @@
availableFileCount = 0;
availableFileCountHigh = 0;
availableFileCountLow = 0;
- writeWaitFailures = 0;
- writeBusyFailures = 0;
- readRecordCount = 0;
- readBusyFailures = 0;
writePageCacheDepth = 0;
writePageCacheDepthHigh = 0;
writePageCacheDepthLow = 0;
@@ -71,10 +65,24 @@
readPageCacheDepthHigh = 0;
readPageCacheDepthLow = 0;
+
+ maxThreads = agent->getMaxThreads();
+ perThreadStatsArray = new struct PerThreadStats*[maxThreads];
+ for (int idx = 0; idx < maxThreads; idx++)
+ perThreadStatsArray[idx] = 0;
+
}
-Journal::~Journal () {}
+Journal::~Journal ()
+{
+ for (int idx = 0; idx < maxThreads; idx++)
+ if (perThreadStatsArray[idx] != 0)
+ delete perThreadStatsArray[idx];
+ delete perThreadStatsArray;
+
+}
+
namespace {
const string NAME("name");
const string TYPE("type");
@@ -382,6 +390,31 @@
}
+
+void Journal::aggregatePerThreadStats(struct PerThreadStats* totals)
+{
+ totals->recordEnqueues = 0;
+ totals->recordDequeues = 0;
+ totals->writeWaitFailures = 0;
+ totals->writeBusyFailures = 0;
+ totals->readRecordCount = 0;
+ totals->readBusyFailures = 0;
+
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+ totals->recordEnqueues += threadStats->recordEnqueues;
+ totals->recordDequeues += threadStats->recordDequeues;
+ totals->writeWaitFailures += threadStats->writeWaitFailures;
+ totals->writeBusyFailures += threadStats->writeBusyFailures;
+ totals->readRecordCount += threadStats->readRecordCount;
+ totals->readBusyFailures += threadStats->readBusyFailures;
+
+ }
+ }
+}
+
+
void Journal::writeProperties (Buffer& buf)
{
sys::Mutex::ScopedLock mutex(accessLock);
@@ -405,6 +438,10 @@
instChanged = false;
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+
+
if (!skipHeaders)
writeTimestamps (buf);
buf.putShort (initialFileCount);
@@ -413,8 +450,8 @@
buf.putLong (recordDepth);
buf.putLong (recordDepthHigh);
buf.putLong (recordDepthLow);
- buf.putLongLong (recordEnqueues);
- buf.putLongLong (recordDequeues);
+ buf.putLongLong (totals.recordEnqueues);
+ buf.putLongLong (totals.recordDequeues);
buf.putLong (outstandingAIOs);
buf.putLong (outstandingAIOsHigh);
buf.putLong (outstandingAIOsLow);
@@ -424,10 +461,10 @@
buf.putLong (availableFileCount);
buf.putLong (availableFileCountHigh);
buf.putLong (availableFileCountLow);
- buf.putLongLong (writeWaitFailures);
- buf.putLongLong (writeBusyFailures);
- buf.putLongLong (readRecordCount);
- buf.putLongLong (readBusyFailures);
+ buf.putLongLong (totals.writeWaitFailures);
+ buf.putLongLong (totals.writeBusyFailures);
+ buf.putLongLong (totals.readRecordCount);
+ buf.putLongLong (totals.readBusyFailures);
buf.putLong (writePageCacheDepth);
buf.putLong (writePageCacheDepthHigh);
buf.putLong (writePageCacheDepthLow);
@@ -450,6 +487,7 @@
readPageCacheDepthHigh = readPageCacheDepth;
readPageCacheDepthLow = readPageCacheDepth;
+
}
void Journal::doMethod (string methodName, Buffer& inBuf, Buffer& outBuf)
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-06-30 19:03:29 UTC (rev 2172)
@@ -27,7 +27,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
-#include "qpid/sys/AtomicCount.h"
namespace qpid {
namespace management {
@@ -57,8 +56,6 @@
uint32_t recordDepth;
uint32_t recordDepthHigh;
uint32_t recordDepthLow;
- uint64_t recordEnqueues;
- uint64_t recordDequeues;
uint32_t outstandingAIOs;
uint32_t outstandingAIOsHigh;
uint32_t outstandingAIOsLow;
@@ -68,10 +65,6 @@
uint32_t availableFileCount;
uint32_t availableFileCountHigh;
uint32_t availableFileCountLow;
- uint64_t writeWaitFailures;
- uint64_t writeBusyFailures;
- uint64_t readRecordCount;
- uint64_t readBusyFailures;
uint32_t writePageCacheDepth;
uint32_t writePageCacheDepthHigh;
uint32_t writePageCacheDepthLow;
@@ -79,6 +72,39 @@
uint32_t readPageCacheDepthHigh;
uint32_t readPageCacheDepthLow;
+
+ // Per-Thread Statistics
+ struct PerThreadStats {
+ uint64_t recordEnqueues;
+ uint64_t recordDequeues;
+ uint64_t writeWaitFailures;
+ uint64_t writeBusyFailures;
+ uint64_t readRecordCount;
+ uint64_t readBusyFailures;
+
+ };
+
+ struct PerThreadStats** perThreadStatsArray;
+
+ inline struct PerThreadStats* getThreadStats() {
+ int index = getThreadIndex();
+ struct PerThreadStats* threadStats = perThreadStatsArray[index];
+ if (threadStats == 0) {
+ threadStats = new(PerThreadStats);
+ perThreadStatsArray[index] = threadStats;
+ threadStats->recordEnqueues = 0;
+ threadStats->recordDequeues = 0;
+ threadStats->writeWaitFailures = 0;
+ threadStats->writeBusyFailures = 0;
+ threadStats->readRecordCount = 0;
+ threadStats->readBusyFailures = 0;
+
+ }
+ return threadStats;
+ }
+
+ void aggregatePerThreadStats(struct PerThreadStats*);
+
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeProperties (qpid::framing::Buffer& buf);
@@ -87,15 +113,15 @@
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
- writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
+ writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
-
public:
friend class PackageMrgstore;
typedef boost::shared_ptr<Journal> shared_ptr;
- Journal (Manageable* coreObject);
+ Journal (ManagementAgent* agent,
+ Manageable* coreObject);
~Journal (void);
void setReference(uint64_t objectId) { queueRef = objectId; }
@@ -165,281 +191,137 @@
instChanged = true;
}
inline void inc_recordDepth (uint32_t by = 1){
- if (by == 1)
- ++recordDepth;
- else
- recordDepth += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ recordDepth += by;
if (recordDepthHigh < recordDepth)
recordDepthHigh = recordDepth;
instChanged = true;
}
inline void dec_recordDepth (uint32_t by = 1){
- if (by == 1)
- recordDepth--;
- else
- recordDepth -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ recordDepth -= by;
if (recordDepthLow > recordDepth)
recordDepthLow = recordDepth;
instChanged = true;
}
- inline void set_recordDepth (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- recordDepth = val;
- if (recordDepthLow > val)
- recordDepthLow = val;
- if (recordDepthHigh < val)
- recordDepthHigh = val;
- instChanged = true;
- }
inline void inc_recordEnqueues (uint64_t by = 1){
- if (by == 1)
- ++recordEnqueues;
- else
- recordEnqueues += by;
+ getThreadStats()->recordEnqueues += by;
instChanged = true;
}
inline void dec_recordEnqueues (uint64_t by = 1){
- if (by == 1)
- recordEnqueues--;
- else
- recordEnqueues -= by;
+ getThreadStats()->recordEnqueues -= by;
instChanged = true;
}
- inline void set_recordEnqueues (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- recordEnqueues = val;
- instChanged = true;
- }
inline void inc_recordDequeues (uint64_t by = 1){
- if (by == 1)
- ++recordDequeues;
- else
- recordDequeues += by;
+ getThreadStats()->recordDequeues += by;
instChanged = true;
}
inline void dec_recordDequeues (uint64_t by = 1){
- if (by == 1)
- recordDequeues--;
- else
- recordDequeues -= by;
+ getThreadStats()->recordDequeues -= by;
instChanged = true;
}
- inline void set_recordDequeues (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- recordDequeues = val;
- instChanged = true;
- }
inline void inc_outstandingAIOs (uint32_t by = 1){
- if (by == 1)
- ++outstandingAIOs;
- else
- outstandingAIOs += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ outstandingAIOs += by;
if (outstandingAIOsHigh < outstandingAIOs)
outstandingAIOsHigh = outstandingAIOs;
instChanged = true;
}
inline void dec_outstandingAIOs (uint32_t by = 1){
- if (by == 1)
- outstandingAIOs--;
- else
- outstandingAIOs -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ outstandingAIOs -= by;
if (outstandingAIOsLow > outstandingAIOs)
outstandingAIOsLow = outstandingAIOs;
instChanged = true;
}
- inline void set_outstandingAIOs (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- outstandingAIOs = val;
- if (outstandingAIOsLow > val)
- outstandingAIOsLow = val;
- if (outstandingAIOsHigh < val)
- outstandingAIOsHigh = val;
- instChanged = true;
- }
inline void inc_freeFileCount (uint32_t by = 1){
- if (by == 1)
- ++freeFileCount;
- else
- freeFileCount += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ freeFileCount += by;
if (freeFileCountHigh < freeFileCount)
freeFileCountHigh = freeFileCount;
instChanged = true;
}
inline void dec_freeFileCount (uint32_t by = 1){
- if (by == 1)
- freeFileCount--;
- else
- freeFileCount -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ freeFileCount -= by;
if (freeFileCountLow > freeFileCount)
freeFileCountLow = freeFileCount;
instChanged = true;
}
- inline void set_freeFileCount (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- freeFileCount = val;
- if (freeFileCountLow > val)
- freeFileCountLow = val;
- if (freeFileCountHigh < val)
- freeFileCountHigh = val;
- instChanged = true;
- }
inline void inc_availableFileCount (uint32_t by = 1){
- if (by == 1)
- ++availableFileCount;
- else
- availableFileCount += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ availableFileCount += by;
if (availableFileCountHigh < availableFileCount)
availableFileCountHigh = availableFileCount;
instChanged = true;
}
inline void dec_availableFileCount (uint32_t by = 1){
- if (by == 1)
- availableFileCount--;
- else
- availableFileCount -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ availableFileCount -= by;
if (availableFileCountLow > availableFileCount)
availableFileCountLow = availableFileCount;
instChanged = true;
}
- inline void set_availableFileCount (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- availableFileCount = val;
- if (availableFileCountLow > val)
- availableFileCountLow = val;
- if (availableFileCountHigh < val)
- availableFileCountHigh = val;
- instChanged = true;
- }
inline void inc_writeWaitFailures (uint64_t by = 1){
- if (by == 1)
- ++writeWaitFailures;
- else
- writeWaitFailures += by;
+ getThreadStats()->writeWaitFailures += by;
instChanged = true;
}
inline void dec_writeWaitFailures (uint64_t by = 1){
- if (by == 1)
- writeWaitFailures--;
- else
- writeWaitFailures -= by;
+ getThreadStats()->writeWaitFailures -= by;
instChanged = true;
}
- inline void set_writeWaitFailures (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- writeWaitFailures = val;
- instChanged = true;
- }
inline void inc_writeBusyFailures (uint64_t by = 1){
- if (by == 1)
- ++writeBusyFailures;
- else
- writeBusyFailures += by;
+ getThreadStats()->writeBusyFailures += by;
instChanged = true;
}
inline void dec_writeBusyFailures (uint64_t by = 1){
- if (by == 1)
- writeBusyFailures--;
- else
- writeBusyFailures -= by;
+ getThreadStats()->writeBusyFailures -= by;
instChanged = true;
}
- inline void set_writeBusyFailures (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- writeBusyFailures = val;
- instChanged = true;
- }
inline void inc_readRecordCount (uint64_t by = 1){
- if (by == 1)
- ++readRecordCount;
- else
- readRecordCount += by;
+ getThreadStats()->readRecordCount += by;
instChanged = true;
}
inline void dec_readRecordCount (uint64_t by = 1){
- if (by == 1)
- readRecordCount--;
- else
- readRecordCount -= by;
+ getThreadStats()->readRecordCount -= by;
instChanged = true;
}
- inline void set_readRecordCount (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- readRecordCount = val;
- instChanged = true;
- }
inline void inc_readBusyFailures (uint64_t by = 1){
- if (by == 1)
- ++readBusyFailures;
- else
- readBusyFailures += by;
+ getThreadStats()->readBusyFailures += by;
instChanged = true;
}
inline void dec_readBusyFailures (uint64_t by = 1){
- if (by == 1)
- readBusyFailures--;
- else
- readBusyFailures -= by;
+ getThreadStats()->readBusyFailures -= by;
instChanged = true;
}
- inline void set_readBusyFailures (uint64_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- readBusyFailures = val;
- instChanged = true;
- }
inline void inc_writePageCacheDepth (uint32_t by = 1){
- if (by == 1)
- ++writePageCacheDepth;
- else
- writePageCacheDepth += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ writePageCacheDepth += by;
if (writePageCacheDepthHigh < writePageCacheDepth)
writePageCacheDepthHigh = writePageCacheDepth;
instChanged = true;
}
inline void dec_writePageCacheDepth (uint32_t by = 1){
- if (by == 1)
- writePageCacheDepth--;
- else
- writePageCacheDepth -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ writePageCacheDepth -= by;
if (writePageCacheDepthLow > writePageCacheDepth)
writePageCacheDepthLow = writePageCacheDepth;
instChanged = true;
}
- inline void set_writePageCacheDepth (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- writePageCacheDepth = val;
- if (writePageCacheDepthLow > val)
- writePageCacheDepthLow = val;
- if (writePageCacheDepthHigh < val)
- writePageCacheDepthHigh = val;
- instChanged = true;
- }
inline void inc_readPageCacheDepth (uint32_t by = 1){
- if (by == 1)
- ++readPageCacheDepth;
- else
- readPageCacheDepth += by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ readPageCacheDepth += by;
if (readPageCacheDepthHigh < readPageCacheDepth)
readPageCacheDepthHigh = readPageCacheDepth;
instChanged = true;
}
inline void dec_readPageCacheDepth (uint32_t by = 1){
- if (by == 1)
- readPageCacheDepth--;
- else
- readPageCacheDepth -= by;
+ sys::Mutex::ScopedLock mutex(accessLock);
+ readPageCacheDepth -= by;
if (readPageCacheDepthLow > readPageCacheDepth)
readPageCacheDepthLow = readPageCacheDepth;
instChanged = true;
}
- inline void set_readPageCacheDepth (uint32_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- readPageCacheDepth = val;
- if (readPageCacheDepthLow > val)
- readPageCacheDepthLow = val;
- if (readPageCacheDepthHigh < val)
- readPageCacheDepthHigh = val;
- instChanged = true;
- }
};
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 19:03:29 UTC (rev 2172)
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementAgent.h"
#include "Store.h"
@@ -35,18 +36,21 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
+ {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
-Store::Store (Manageable* _core, Manageable* _parent) :
- ManagementObject(_core)
-
+Store::Store (ManagementAgent* _agent, Manageable* _core, Manageable* _parent) :
+ ManagementObject(_agent, _core)
{
brokerRef = _parent->GetManagementObject ()->getObjectId ();
+
}
-Store::~Store () {}
+Store::~Store ()
+{
+}
+
namespace {
const string NAME("name");
const string TYPE("type");
@@ -71,7 +75,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (4); // Config Element Count
+ buf.putShort (5); // Config Element Count
buf.putShort (0); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -93,6 +97,14 @@
buf.put (ft);
ft = FieldTable ();
+ ft.setString (NAME, "async");
+ ft.setInt (TYPE, TYPE_BOOL);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (DESC, "Asynchronous IO");
+ buf.put (ft);
+
+ ft = FieldTable ();
ft.setString (NAME, "defaultInitialFileCount");
ft.setInt (TYPE, TYPE_U16);
ft.setInt (ACCESS, ACCESS_RO);
@@ -119,6 +131,8 @@
}
+
+
void Store::writeProperties (Buffer& buf)
{
sys::Mutex::ScopedLock mutex(accessLock);
@@ -127,6 +141,7 @@
writeTimestamps (buf);
buf.putLongLong (brokerRef);
buf.putShortString (location);
+ buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
@@ -138,12 +153,14 @@
instChanged = false;
+
if (!skipHeaders)
writeTimestamps (buf);
// Maintenance of hi-lo statistics
+
}
void Store::doMethod (string, Buffer&, Buffer& outBuf)
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 15:27:47 UTC (rev 2171)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 19:03:29 UTC (rev 2172)
@@ -27,7 +27,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
-#include "qpid/sys/AtomicCount.h"
namespace qpid {
namespace management {
@@ -43,11 +42,13 @@
// Properties
uint64_t brokerRef;
std::string location;
+ uint8_t async;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
// Statistics
+
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeProperties (qpid::framing::Buffer& buf);
@@ -56,9 +57,9 @@
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
- writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
+ writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
- // Stub for getInstChanged. There are no inst elements
+ // Stub for getInstChanged. There are no statistics in this class.
bool getInstChanged (void) { return false; }
public:
@@ -66,7 +67,8 @@
friend class PackageMrgstore;
typedef boost::shared_ptr<Store> shared_ptr;
- Store (Manageable* coreObject, Manageable* _parent);
+ Store (ManagementAgent* agent,
+ Manageable* coreObject, Manageable* _parent);
~Store (void);
@@ -88,6 +90,11 @@
location = val;
configChanged = true;
}
+ inline void set_async (uint8_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ async = val;
+ configChanged = true;
+ }
inline void set_defaultInitialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
defaultInitialFileCount = val;
16 years, 5 months
rhmessaging commits: r2171 - in store/trunk/cpp/lib: gen/qpid/management and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-06-30 11:27:47 -0400 (Mon, 30 Jun 2008)
New Revision: 2171
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
Log:
Cleaned up BdbMessageStore, removed unneeded BDB sync storage
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 15:27:47 UTC (rev 2171)
@@ -51,7 +51,6 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
-bool BdbMessageStore::useAsync;
qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -88,7 +87,6 @@
mgmtObject = qpid::management::Store::shared_ptr(new qpid::management::Store (this, broker));
mgmtObject->set_location(storeDir);
- mgmtObject->set_async(useAsync);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
@@ -125,15 +123,12 @@
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
- // TODO: remove
- useAsync = true;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
journal::jdir::create_dir(bdbdir);
- bool ret = false;
try {
env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
@@ -167,11 +162,6 @@
throw;
}
- // TODO: remove
- bool force = false;
- ret = mode(useAsync, force);
- if (!ret) return false;
-
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -249,54 +239,6 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-// true is async
-bool BdbMessageStore::mode(const bool async, const bool force)
-{
-
- u_int32_t id (1); // key one in config is mode
- Dbt key(&id, sizeof(id));
- size_t preamble_length = sizeof(u_int32_t);
- BufferValue value(preamble_length, 0);
- u_int32_t avalue = async ? 1 : 2;
- value.buffer.putLong( avalue );
- bool same = false;
- bool hasMode = false;
-
- {
- Cursor config;
- config.open(configDb, 0);
- IdDbt rkey;
- BufferValue rvalue(preamble_length, 0);
- rvalue.buffer.record();
-
- while (config.next(rkey, rvalue)) {
- if (rkey.id == 1)
- {
- hasMode = true;
- u_int32_t valueL = rvalue.buffer.getLong();
- if (avalue == valueL){
- same = true;
- }else {
- break;
- }
- }
- }
- }
- if (same) return true;
- if (!same && !force && hasMode) return false;
- if (!same && force && hasMode) {
- truncate();
- }
-
- int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
- if (status == DB_KEYEXIST) {
- return false;
- } else {
- return true;
- }
- return false;
-}
-
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
@@ -346,36 +288,34 @@
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
- if (usingJrnl()) {
- JournalImpl* jQueue = 0;
- FieldTable::ValuePtr value;
+ JournalImpl* jQueue = 0;
+ FieldTable::ValuePtr value;
- uint16_t localFileCount = numJrnlFiles;
- uint32_t localFileSize = jrnlFsizePgs;
+ uint16_t localFileCount = numJrnlFiles;
+ uint32_t localFileSize = jrnlFsizePgs;
- value = args.get ("qpid.file_count");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileCount = (uint16_t) value->get<int>();
+ value = args.get ("qpid.file_count");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileCount = (uint16_t) value->get<int>();
- value = args.get ("qpid.file_size");
- if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileSize = (uint32_t) value->get<int>();
+ value = args.get ("qpid.file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileSize = (uint32_t) value->get<int>();
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"), defJournalGetEventsTimeout,
- defJournalFlushTimeout);
- }
- queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
- // init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": create() failed: " + e.what());
- }
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+ string("JournalData"), defJournalGetEventsTimeout,
+ defJournalFlushTimeout);
}
+ queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+ ": create() failed: " + e.what());
+ }
try {
if (!create(queueDb, queueIdSequence, queue)) {
@@ -557,38 +497,32 @@
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
- if (usingJrnl())
+ const char* queueName = queue->getName().c_str();
+ JournalImpl* jQueue = 0;
{
- const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = 0;
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
- }
- queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ }
+ queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recover_complete(); // start journal.
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
- }
- //read all messages: done on a per queue basis if using Journal
+ try
+ {
+ u_int64_t thisHighestRid = 0;
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recover_complete(); // start journal.
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
}
+ //read all messages: done on a per queue basis if using Journal
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
-
- if (!usingJrnl()) //read all messages:
- recoverMessages(txn, registry, queue_index, prepared, messages);
}
@@ -667,7 +601,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
@@ -784,69 +717,7 @@
}
-// bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared)
-{
- //have to create a new txn here, and commit in batches to avoid
- //problems with large message databases
- TxnCtxt txn;
- txn.begin(env);
- Cursor messages;
- messages.open(messageDb, txn.get());
-
- IdDbt key;
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
- u_int64_t maxMessageId(1);
-
- BufferValue value(preamble_length, 0);
- value.buffer.record();
- uint count(0);
- while (messages.next(key, value)) {
- if (++count % 1000 == 0) {
- QPID_LOG(debug, "Recovering " << count << "th message...");
- //reset cursor && txn:
- messages.close();
- txn.commit();
- txn.begin(env);
- messages.open(messageDb, txn.get());
- messages->get(&key, &value, DB_SET);
- }
- //read header only to begin with
- u_int32_t headerSize = value.buffer.getLong();
- value.buffer.restore();
-
- BufferValue header(headerSize, preamble_length);
- messages.current(key, header);
-
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
- msg->setPersistenceId(key.id);
-
- u_int32_t contentOffset = headerSize + preamble_length;
- u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) - contentOffset;
- if (msg->loadContent(contentSize)) {
- //now read the content
- BufferValue content(contentSize, contentOffset);
- messages.current(key, content);
- msg->decodeContent(content.buffer);
- }
-
- //find all the queues into which this message has been enqueued
- if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
- //message not referenced anywhere - can delete
- messages->del(0);
- } else {
- if (key.id > maxMessageId) {
- maxMessageId = key.id;
- }
- }
- }
- messages.close();
- txn.commit();
- messageIdSequence.reset(maxMessageId + 1);
-}
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked,
message_index& prepared)
@@ -881,34 +752,12 @@
collectPreparedXids(prepared);
//when using the async journal, it will abort unprepaired xids and populate the locked maps
- if (!usingJrnl()){
- txn_lock_map enqueues;
- txn_lock_map dequeues;
- std::set<string> known;
- readXids(enqueueXidDb, known);
- readXids(dequeueXidDb, known);
-
- //abort all known but unprepared xids:
- for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
- TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb, false);
- }
- }
- readLockedMappings(enqueueXidDb, enqueues);
- readLockedMappings(dequeueXidDb, dequeues);
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
- } else {
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- LockedMappings::shared_ptr enq_ptr;
- enq_ptr.reset(new LockedMappings);
- LockedMappings::shared_ptr deq_ptr;
- deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
- }
-
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ LockedMappings::shared_ptr enq_ptr;
+ enq_ptr.reset(new LockedMappings);
+ LockedMappings::shared_ptr deq_ptr;
+ deq_ptr.reset(new LockedMappings);
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
}
}
@@ -1081,7 +930,6 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
- if (!usingJrnl()) return;
if (queue.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue.getName();
@@ -1107,7 +955,6 @@
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
TxnCtxt implicit;
TxnCtxt* txn = 0;
@@ -1115,11 +962,10 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
-
bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
@@ -1128,20 +974,9 @@
}
store(&queue, txn, key, msg, newId);
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- }else{
- msg->enqueueComplete(); // set enqueued for ack
- put(mappingDb, txn->get(), key, value);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- // cct if using Journal do we need to wait for IO to complete before calling thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- }
- }
-
if (!ctxt) txn->commit();
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
@@ -1167,7 +1002,7 @@
try {
- if ( queue && usingJrnl()) {
+ if ( queue ) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
@@ -1198,8 +1033,6 @@
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
e.what());
- } catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -1223,37 +1056,21 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
}
try {
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
+ msg->dequeueComplete();
+ // if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ // msg->setPersistenceId(0);
- } else if (txn->isTPC()) {
- //if this is part of a 2pc transaction, then only record the dequeue now,
- //it will be applied on commit
- record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- } else {
- Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
- if (dequeue(txn->get(), key, value)) {
- msg->setPersistenceId(0);//clear id as we have now removed the message from the store
- msg->dequeueComplete(); // set dequeued for ack
- }
- }
if (!ctxt) txn->commit();
- } catch (const DbException& e) {
- if (!ctxt) txn->abort();
- THROW_STORE_EXCEPTION_2("Error dequeing message", e);
} catch (const std::exception& e) {
if (!ctxt) txn->abort();
throw;
@@ -1289,49 +1106,6 @@
}
}
-bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
-{
- //First look up the message, this gets a lock on that table in
- //case we need to delete it (avoiding deadlocks with enqueue where
- //the locking order is messageDb then mappingDb)
- Cursor msgCursor;
- msgCursor.open(messageDb, txn);
-
- try {
- Dbt peek;
- peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
- peek.set_ulen(0);
- int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
- if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record for message");
- } else if (status != 0 && status != DB_BUFFER_SMALL) {
- string e = "Dequeue failed (while seeking message) with unexpected status = ";
- e += DbEnv::strerror(status);
- THROW_STORE_EXCEPTION(e);
- }
- } catch (DbMemoryException& expected) {
- }
-
- Cursor cursor;
- cursor.open(mappingDb, txn);
-
- int status = cursor->get(&messageId, &queueId, DB_GET_BOTH | DB_RMW);
- if (status == 0) {
- cursor->del(0);
- } else if (status == DB_NOTFOUND ) {
- THROW_STORE_EXCEPTION("Can't find record mapping message to queue");
- } else {
- THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
- }
-
- if (isUnused(cursor, messageId)) {
- msgCursor->del(0);
- return true;
- } else {
- return false;
- }
-}
-
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
checkInit();
@@ -1369,29 +1143,13 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env, !usingJrnl());
+ if (!txn.get()) txn.begin(env);
try {
StringDbt key(txn.getXid());
- if (!usingJrnl()){
- //scroll through all records matching xid in apply and dequeue
- //using the message and queue id encoded in each value
- Cursor c;
- c.open(apply, txn.get());
- IdPairDbt value;
-
- for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
- dequeue(txn.get(), value.message, value.queue);
- }
- c.close();
-
- //delete all records matching xid
- discard.del(txn.get(), &key, 0);
- apply.del(txn.get(), &key, 0);
- }
prepareXidDb.del(txn.get(), &key, 0);
txn.complete(commit);
@@ -1407,7 +1165,7 @@
checkInit();
// pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1415,11 +1173,11 @@
{
checkInit();
IdSequence* jtx = NULL;
- if (usingJrnl()) jtx = &messageIdSequence;
+ jtx = &messageIdSequence;
// pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env, !usingJrnl());
+ txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1451,7 +1209,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->commit();
}
@@ -1462,7 +1220,7 @@
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
txn->abort();
}
@@ -1489,14 +1247,6 @@
}
}
-void BdbMessageStore::record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId)
-{
- StringDbt key(txn.getXid());
- IdPairDbt value(queueId, messageId);
- put(db, txn.get(), key, value);
-}
-
-
bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
{
@@ -1609,7 +1359,7 @@
wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
- ("store-directory", qpid::optValue(storeDir, "DIR"),
+ ("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Must be supplied if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 15:27:47 UTC (rev 2171)
@@ -88,7 +88,6 @@
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
IdSequence messageIdSequence;
- static bool useAsync;
std::string storeDir;
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
@@ -124,8 +123,6 @@
Dbt& messageId,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
bool newId);
- void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
@@ -134,7 +131,7 @@
bool isUnused(Cursor& cursor, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+ void completed(TPCTxnCtxt& txn, bool commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
void deleteBinding(const qpid::broker::PersistableExchange& exchange,
@@ -152,7 +149,6 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-06-30 15:27:47 UTC (rev 2171)
@@ -35,7 +35,7 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x91,0xcf,0xc4,0xa7,0x9b,0x4a,0x2a,0x88,0x32,0x6f,0xef,0xec,0x82,0xd7,0x12,0x6a};
+ {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
Store::Store (Manageable* _core, Manageable* _parent) :
ManagementObject(_core)
@@ -71,7 +71,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (5); // Config Element Count
+ buf.putShort (4); // Config Element Count
buf.putShort (0); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -93,14 +93,6 @@
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "async");
- ft.setInt (TYPE, TYPE_BOOL);
- ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
- ft.setString (DESC, "Asynchronous IO");
- buf.put (ft);
-
- ft = FieldTable ();
ft.setString (NAME, "defaultInitialFileCount");
ft.setInt (TYPE, TYPE_U16);
ft.setInt (ACCESS, ACCESS_RO);
@@ -135,7 +127,6 @@
writeTimestamps (buf);
buf.putLongLong (brokerRef);
buf.putShortString (location);
- buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 13:04:03 UTC (rev 2170)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-06-30 15:27:47 UTC (rev 2171)
@@ -43,7 +43,6 @@
// Properties
uint64_t brokerRef;
std::string location;
- uint8_t async;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
@@ -89,11 +88,6 @@
location = val;
configChanged = true;
}
- inline void set_async (uint8_t val){
- sys::Mutex::ScopedLock mutex(accessLock);
- async = val;
- configChanged = true;
- }
inline void set_defaultInitialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
defaultInitialFileCount = val;
16 years, 5 months
rhmessaging commits: r2170 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-06-30 09:04:03 -0400 (Mon, 30 Jun 2008)
New Revision: 2170
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
BDB cleanup - phase 1. Removed all sync tests, refactored tests (slightly) to improve layout without need for sync/async; removed async and force params from BdbMessageStore::init(...). Next phase will clean up the BdbMessageStore class itself.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 13:04:03 UTC (rev 2170)
@@ -97,7 +97,7 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
+bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
{
if (isInit) return true;
@@ -125,7 +125,8 @@
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
- useAsync = async;
+ // TODO: remove
+ useAsync = true;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
@@ -165,11 +166,14 @@
txn.abort();
throw;
}
+
+ // TODO: remove
+ bool force = false;
ret = mode(useAsync, force);
if (!ret) return false;
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; async=" << (async?"T":"F") << "; force=" << (force?"T":"F") << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
+ QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
}
@@ -242,7 +246,7 @@
}
}
- return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
+ return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
// true is async
@@ -1600,8 +1604,6 @@
BdbMessageStore::Options::Options(const std::string& name) :
qpid::Options(name),
- storeAsync(true),
- storeForce(false),
numJrnlFiles(8),
jrnlFsizePgs(24),
wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 13:04:03 UTC (rev 2170)
@@ -68,8 +68,6 @@
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Default store settings
- static const bool defUseAsync = false;
- static const bool defForceStoreConversion = false;
static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
@@ -157,7 +155,7 @@
static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
- if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
public:
@@ -177,7 +175,7 @@
BdbMessageStore(const char* envpath = 0);
virtual ~BdbMessageStore();
bool init(const qpid::Options* options);
- bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+ bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
void initManagement (qpid::broker::Broker* broker);
void truncate();
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2008-06-30 13:04:03 UTC (rev 2170)
@@ -53,10 +53,10 @@
std::queue<Uuid> ids;
int counter = 1;
-void setup(bool async)
+void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -88,13 +88,13 @@
}
}
-void restart(bool async)
+void restart()
{
queue.reset();
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
@@ -114,57 +114,33 @@
BOOST_CHECK_EQUAL((size_t) 0, ids.size());
}
-void testBasic(bool async = false)
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Basic)
{
- setup(async);
+ std::cout << test_filename << ".Basic: " << std::flush;
+ setup();
//push on 10 messages
for (int i = 0; i < 10; i++) push();
- restart(async);
+ restart();
check();
+ std::cout << "ok" << std::endl;
}
-void testCycle(bool async = false)
+QPID_AUTO_TEST_CASE(Cycle)
{
- setup(async);
+ std::cout << test_filename << ".Cycle: " << std::flush;
+ setup();
//push on 10 messages:
for (int i = 0; i < 10; i++) push();
//pop 5:
for (int i = 0; i < 5; i++) pop();
//push on another 5:
for (int i = 0; i < 5; i++) push();
- restart(async);
+ restart();
check();
-}
-
-
-// === Test suite ===
-
-QPID_AUTO_TEST_CASE(BasicSync)
-{
- std::cout << test_filename << ".BasicSync: " << std::flush;
- testBasic(false);
std::cout << "ok" << std::endl;
}
-QPID_AUTO_TEST_CASE(BasicAsync)
-{
- std::cout << test_filename << ".BasicAsync: " << std::flush;
- testBasic(true);
- std::cout << "ok" << std::endl;
-}
-
-QPID_AUTO_TEST_CASE(CycleSync)
-{
- std::cout << test_filename << ".CycleSync: " << std::flush;
- testCycle(false);
- std::cout << "ok" << std::endl;
-}
-
-QPID_AUTO_TEST_CASE(CycleAsync)
-{
- std::cout << test_filename << ".CycleAsync: " << std::flush;
- testCycle(true);
- std::cout << "ok" << std::endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-06-30 13:04:03 UTC (rev 2170)
@@ -82,10 +82,60 @@
recover(store, queues, exchanges, links);
}
-void testCreateDelete(bool async)
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args)
{
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ store.truncate();//make sure it is empty to begin with
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
+ BOOST_REQUIRE(exchange->bind(queue, key, &args));
+ store.bind(*exchange, *queue, key, args);
+ }//db will be closed
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links(0);
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ //check it is bound by unbinding
+ BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+ store.unbind(*exchange, *queue, key, args);
+ }
+ {
+ BdbMessageStore store;
+ store.init(test_dir, 4, 1, 8);
+ ExchangeRegistry exchanges;
+ QueueRegistry queues;
+ LinkRegistry links(0);
+
+ recover(store, queues, exchanges, links);
+
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ Queue::shared_ptr queue = queues.find(queueName);
+ //make sure it is no longer bound
+ BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+ }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+ cout << test_filename << ".CreateDelete: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -95,26 +145,33 @@
store.destroy(queue);
// TODO - check dir is deleted
+ cout << "ok" << endl;
}
-void testEmptyRecover(bool async)
+QPID_AUTO_TEST_CASE(EmptyRecover)
{
+ cout << test_filename << ".EmptyRecover: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
//nothing to assert, just testing it doesn't blow up
+
+ cout << "ok" << endl;
}
-void testQueueCreate(bool async)
+QPID_AUTO_TEST_CASE(QueueCreate)
{
+ cout << test_filename << ".QueueCreate: " << flush;
+
uint64_t id(0);
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -123,7 +180,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -131,15 +188,19 @@
BOOST_REQUIRE(queue.get());
BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
}
+
+ cout << "ok" << endl;
}
-void testQueueCreateWithSettings(bool async)
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
{
+ cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
QueuePolicy policy(101, 202);
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -149,7 +210,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -159,14 +220,18 @@
BOOST_CHECK_EQUAL(policy.getMaxCount(), queue->getPolicy()->getMaxCount());
BOOST_CHECK_EQUAL(policy.getMaxSize(), queue->getPolicy()->getMaxSize());
}
+
+ cout << "ok" << endl;
}
-void testQueueDestroy(bool async)
+QPID_AUTO_TEST_CASE(QueueDestroy)
{
+ cout << test_filename << ".QueueDestroy: " << flush;
+
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -174,16 +239,20 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
BOOST_REQUIRE(!registry.find(name));
}
+
+ cout << "ok" << endl;
}
-void testEnqueue(bool async)
+QPID_AUTO_TEST_CASE(Enqueue)
{
+ cout << test_filename << ".Enqueue: " << flush;
+
//TODO: this is largely copy & paste'd from MessageTest in
//qpid tree. ideally need some helper routines for reducing
//this to a simpler less duplicated form
@@ -196,7 +265,7 @@
string data2("hijklmn");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -215,7 +284,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -240,10 +309,14 @@
BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
}
+
+ cout << "ok" << endl;
}
-void testDequeue(bool async)
+QPID_AUTO_TEST_CASE(Dequeue)
{
+ cout << test_filename << ".Dequeue: " << flush;
+
//TODO: reduce the duplication in these tests
string name("MyDurableQueue");
{
@@ -252,7 +325,7 @@
Uuid messageId(true);
string data("abcdefg");
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -267,7 +340,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -275,10 +348,14 @@
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
}
+
+ cout << "ok" << endl;
}
-void testStaging(bool async)
+QPID_AUTO_TEST_CASE(Staging)
{
+ cout << test_filename << ".Staging: " << flush;
+
const string name("MyDurableQueue");
const string exchange("MyExchange");
const string routingKey("MyRoutingKey");
@@ -287,7 +364,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
//create & stage a message
@@ -329,7 +406,7 @@
{
//recover
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
ExchangeRegistry exchanges;
@@ -372,12 +449,16 @@
//dequeue
queue->dequeue(0, msg);
}
+
+ cout << "ok" << endl;
}
-void testDestroyStagedMessage(bool async)
+QPID_AUTO_TEST_CASE(DestroyStagedMessage)
{
+ cout << test_filename << ".DestroyStagedMessage: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -396,12 +477,16 @@
BOOST_FAIL("store.loadContent() did not throw StoreException as expected.");
} catch (StoreException& e) {
}
+
+ cout << "ok" << endl;
}
-void testDestroyEnqueuedMessage(bool async)
+QPID_AUTO_TEST_CASE(DestroyEnqueuedMessage)
{
+ cout << test_filename << ".DestroyEnqueuedMessage: " << flush;
+
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -422,10 +507,14 @@
store.dequeue(0, pmsg, queue);
store.destroy(queue);
+
+ cout << "ok" << endl;
}
-void testExchangeCreateAndDestroy(bool async)
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
{
+ cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
uint64_t id(0);
string name("MyDurableExchange");
string type("direct");
@@ -433,7 +522,7 @@
args.setString("a", "A");
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -443,7 +532,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -457,7 +546,7 @@
}
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -469,68 +558,35 @@
BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
}
}
+
+ cout << "ok" << endl;
}
-void bindAndUnbind(const string& exchangeName, const string& queueName,
- const string& key, const FieldTable& args, bool async)
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
{
- {
- BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
- Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
- Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
- store.create(*exchange, qpid::framing::FieldTable());
- store.create(*queue, qpid::framing::FieldTable());
- BOOST_REQUIRE(exchange->bind(queue, key, &args));
- store.bind(*exchange, *queue, key, args);
- }//db will be closed
- {
- BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
- ExchangeRegistry exchanges;
- QueueRegistry queues;
- LinkRegistry links(0);
+ cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
- recover(store, queues, exchanges, links);
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
- Exchange::shared_ptr exchange = exchanges.get(exchangeName);
- Queue::shared_ptr queue = queues.find(queueName);
- //check it is bound by unbinding
- BOOST_REQUIRE(exchange->unbind(queue, key, &args));
- store.unbind(*exchange, *queue, key, args);
- }
- {
- BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
- ExchangeRegistry exchanges;
- QueueRegistry queues;
- LinkRegistry links(0);
-
- recover(store, queues, exchanges, links);
-
- Exchange::shared_ptr exchange = exchanges.get(exchangeName);
- Queue::shared_ptr queue = queues.find(queueName);
- //make sure it is no longer bound
- BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
- }
+ cout << "ok" << endl;
}
-void testExchangeBindAndUnbind(bool async)
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
{
- bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable(), async);
-}
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
-void testExchangeBindAndUnbindWithArgs(bool async)
-{
FieldTable args;
args.setString("a", "A");
args.setString("b", "B");
- bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args, async);
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+ cout << "ok" << endl;
}
-void testExchangeImplicitUnbind(bool async)
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
{
+ cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
string exchangeName("MyDurableExchange");
string queueName1("MyDurableQueue1");
string queueName2("MyDurableQueue2");
@@ -538,7 +594,7 @@
FieldTable args;
{
BdbMessageStore store;
- store.init(test_dir, async, true, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -553,7 +609,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links(0);
@@ -570,7 +626,7 @@
}
{
BdbMessageStore store;
- store.init(test_dir, async, false, 4, 1, 8);
+ store.init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links(0);
@@ -587,205 +643,8 @@
Queue::shared_ptr queue = queues.find(queueName2);
store.destroy(*queue);
}
-}
-
-// === Test suite ===
-
-QPID_AUTO_TEST_CASE(CreateDeleteSync)
-{
- cout << test_filename << ".CreateDeleteSync: " << flush;
- testCreateDelete(false);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CreateDeleteAsync)
-{
- cout << test_filename << ".CreateDeleteAsync: " << flush;
- testCreateDelete(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EmptyRecoverSync)
-{
- cout << test_filename << ".EmptyRecoverSync: " << flush;
- testEmptyRecover(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EmptyRecoverAsync)
-{
- cout << test_filename << ".EmptyRecoverAsync: " << flush;
- testEmptyRecover(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateSync)
-{
- cout << test_filename << ".QueueCreateSync: " << flush;
- testQueueCreate(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateAsync)
-{
- cout << test_filename << ".QueueCreateAsync: " << flush;
- testQueueCreate(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateWithSettingsSync)
-{
- cout << test_filename << ".QueueCreateWithSettingsSync: " << flush;
- testQueueCreateWithSettings(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueCreateWithSettingsAsync)
-{
- cout << test_filename << ".QueueCreateWithSettingsAsync: " << flush;
- testQueueCreateWithSettings(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueDestroySync)
-{
- cout << test_filename << ".QueueDestroySync: " << flush;
- testQueueDestroy(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(QueueDestroyAsync)
-{
- cout << test_filename << ".QueueDestroyAsync: " << flush;
- testQueueDestroy(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EnqueueSync)
-{
- cout << test_filename << ".EnqueueSync: " << flush;
- testEnqueue(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(EnqueueAsync)
-{
- cout << test_filename << ".EnqueueAsync: " << flush;
- testEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DequeueSync)
-{
- cout << test_filename << ".DequeueSync: " << flush;
- testDequeue(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DequeueAsync)
-{
- cout << test_filename << ".DequeueAsync: " << flush;
- testDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(StagingSync)
-{
- cout << test_filename << ".StagingSync: " << flush;
- testStaging(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(StagingAsync)
-{
- cout << test_filename << ".StagingAsync: " << flush;
- testStaging(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyStagedMessageSync)
-{
- cout << test_filename << ".DestroyStagedMessageSync: " << flush;
- testDestroyStagedMessage(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyStagedMessageAsync)
-{
- cout << test_filename << ".DestroyStagedMessageAsync: " << flush;
- testDestroyStagedMessage(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyEnqueuedMessageSync)
-{
- cout << test_filename << ".DestroyEnqueuedMessageSync: " << flush;
- testDestroyEnqueuedMessage(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyEnqueuedMessageAsync)
-{
- cout << test_filename << ".DestroyEnqueuedMessageAsync: " << flush;
- testDestroyEnqueuedMessage(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroySync)
-{
- cout << test_filename << ".ExchangeCreateAndDestroySync: " << flush;
- testExchangeCreateAndDestroy(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroyAsync)
-{
- cout << test_filename << ".ExchangeCreateAndDestroyAsync: " << flush;
- testExchangeCreateAndDestroy(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindSync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindSync: " << flush;
- testExchangeBindAndUnbind(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindAsync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindAsync: " << flush;
- testExchangeBindAndUnbind(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsSync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindWithArgsSync: " << flush;
- testExchangeBindAndUnbindWithArgs(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsAsync)
-{
- cout << test_filename << ".ExchangeBindAndUnbindWithArgsAsync: " << flush;
- testExchangeBindAndUnbindWithArgs(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindSync)
-{
- cout << test_filename << ".ExchangeImplicitUnbindSync: " << flush;
- testExchangeImplicitUnbind(false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindAsync)
-{
- cout << test_filename << ".ExchangeImplicitUnbindAsync: " << flush;
- testExchangeImplicitUnbind(true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-06-30 13:04:03 UTC (rev 2170)
@@ -55,10 +55,10 @@
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
-void setup(bool async)
+void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
@@ -75,14 +75,14 @@
queueA->deliver(msg);
}
-void restart(bool async)
+void restart()
{
queueA.reset();
queueB.reset();
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
@@ -117,9 +117,9 @@
BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
}
-void swap(bool commit, bool async)
+void swap(bool commit)
{
- setup(async);
+ setup();
boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
BOOST_REQUIRE(msg);
@@ -133,39 +133,25 @@
store->abort(*txn);
}
- restart(async);
+ restart();
check(commit);
}
// === Test suite ===
-QPID_AUTO_TEST_CASE(CommitSync)
+QPID_AUTO_TEST_CASE(Commit)
{
- cout << test_filename << ".CommitSync: " << flush;
- swap(true, false);
+ cout << test_filename << ".Commit: " << flush;
+ swap(true);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitAsync)
+QPID_AUTO_TEST_CASE(Abort)
{
- cout << test_filename << ".CommitAsync: " << flush;
- swap(true, true);
+ cout << test_filename << ".Abort: " << flush;
+ swap(false);
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortSync)
-{
- cout << test_filename << ".AbortSync: " << flush;
- swap(false, false);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortAsync)
-{
- cout << test_filename << ".AbortAsync: " << flush;
- swap(false, true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-06-30 13:04:03 UTC (rev 2170)
@@ -134,7 +134,6 @@
boost::intrusive_ptr<Message> msg1;
boost::intrusive_ptr<Message> msg2;
boost::intrusive_ptr<Message> msg4;
- bool async;
void recoverPrepared(bool commit)
{
@@ -221,7 +220,7 @@
void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, true, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
@@ -247,7 +246,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(test_dir, async, false, 4, 1, 8);
+ store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
@@ -295,78 +294,67 @@
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
- void testCommitSwap(bool a)
+ void testCommitSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
commit(swap);
}
- void testPrepareAndAbortSwap(bool a)
+ void testPrepareAndAbortSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
abort(swap, true);
}
- void testAbortNoPrepareSwap(bool a)
+ void testAbortNoPrepareSwap()
{
- async = a;
Swap swap(this, "SwapMessageId");
abort(swap, false);
}
- void testCommitEnqueue(bool a)
+ void testCommitEnqueue()
{
- async = a;
Enqueue enqueue(this);
commit(enqueue);
}
- void testPrepareAndAbortEnqueue(bool a)
+ void testPrepareAndAbortEnqueue()
{
- async = a;
Enqueue enqueue(this);
abort(enqueue, true);
}
- void testAbortNoPrepareEnqueue(bool a)
+ void testAbortNoPrepareEnqueue()
{
- async = a;
Enqueue enqueue(this);
abort(enqueue, false);
}
- void testCommitDequeue(bool a)
+ void testCommitDequeue()
{
- async = a;
Dequeue dequeue(this);
commit(dequeue);
}
- void testPrepareAndAbortDequeue(bool a)
+ void testPrepareAndAbortDequeue()
{
- async = a;
Dequeue dequeue(this);
abort(dequeue, true);
}
- void testAbortNoPrepareDequeue(bool a)
+ void testAbortNoPrepareDequeue()
{
- async = a;
Dequeue dequeue(this);
abort(dequeue, false);
}
- void testRecoverPreparedThenCommitted(bool a)
+ void testRecoverPreparedThenCommitted()
{
- async = a;
recoverPrepared(true);
}
- void testRecoverPreparedThenAborted(bool a)
+ void testRecoverPreparedThenAborted()
{
- async = a;
recoverPrepared(false);
}
};
@@ -375,158 +363,81 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(CommitSwapSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
{
- cout << test_filename << ".CommitSwapSync: " << flush;
- tpct.testCommitSwap(false);
+ cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+ tpct.testPrepareAndAbortSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwapSync)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
{
- cout << test_filename << ".PrepareAndAbortSwapSync: " << flush;
- tpct.testPrepareAndAbortSwap(false);
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwapSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
{
- cout << test_filename << ".AbortNoPrepareSwapSync: " << flush;
- tpct.testAbortNoPrepareSwap(false);
+ cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+ tpct.testAbortNoPrepareEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitEnqueueSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
{
- cout << test_filename << ".CommitEnqueueSync: " << flush;
- tpct.testCommitEnqueue(false);
+ cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+ tpct.testPrepareAndAbortDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueueSync)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
{
- cout << test_filename << ".PrepareAndAbortEnqueueSync: " << flush;
- tpct.testPrepareAndAbortEnqueue(false);
+ cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+ tpct.testRecoverPreparedThenCommitted();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueueSync)
+QPID_AUTO_TEST_CASE(CommitSwap)
{
- cout << test_filename << ".AbortNoPrepareEnqueueSync: " << flush;
- tpct.testAbortNoPrepareEnqueue(false);
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitDequeueSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
{
- cout << test_filename << ".CommitDequeueSync: " << flush;
- tpct.testCommitDequeue(false);
+ cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+ tpct.testAbortNoPrepareSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeueSync)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
{
- cout << test_filename << ".PrepareAndAbortDequeueSync: " << flush;
- tpct.testPrepareAndAbortDequeue(false);
+ cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+ tpct.testPrepareAndAbortEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeueSync)
+QPID_AUTO_TEST_CASE(CommitDequeue)
{
- cout << test_filename << ".AbortNoPrepareDequeueSync: " << flush;
- tpct.testAbortNoPrepareDequeue(false);
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommittedSync)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
{
- cout << test_filename << ".RecoverPreparedThenCommittedSync: " << flush;
- tpct.testRecoverPreparedThenCommitted(false);
+ cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+ tpct.testAbortNoPrepareDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenAbortedSync)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted)
{
- cout << test_filename << ".RecoverPreparedThenAbortedSync: " << flush;
- tpct.testRecoverPreparedThenAborted(false);
+ cout << test_filename << ".RecoverPreparedThenAborted: " << flush;
+ tpct.testRecoverPreparedThenAborted();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitSwapAsync)
-{
- cout << test_filename << ".CommitSwapAsync: " << flush;
- tpct.testCommitSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwapAsync)
-{
- cout << test_filename << ".PrepareAndAbortSwapAsync: " << flush;
- tpct.testPrepareAndAbortSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwapAsync)
-{
- cout << test_filename << ".AbortNoPrepareSwapAsync: " << flush;
- tpct.testAbortNoPrepareSwap(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(CommitEnqueueAsync)
-{
- cout << test_filename << ".CommitEnqueueAsync: " << flush;
- tpct.testCommitEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueueAsync)
-{
- cout << test_filename << ".PrepareAndAbortEnqueueAsync: " << flush;
- tpct.testPrepareAndAbortEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueueAsync)
-{
- cout << test_filename << ".AbortNoPrepareEnqueueAsync: " << flush;
- tpct.testAbortNoPrepareEnqueue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(CommitDequeueAsync)
-{
- cout << test_filename << ".CommitDequeueAsync: " << flush;
- tpct.testCommitDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeueAsync)
-{
- cout << test_filename << ".PrepareAndAbortDequeueAsync: " << flush;
- tpct.testPrepareAndAbortDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeueAsync)
-{
- cout << test_filename << ".AbortNoPrepareDequeueAsync: " << flush;
- tpct.testAbortNoPrepareDequeue(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommittedAsync)
-{
- cout << test_filename << ".RecoverPreparedThenCommittedAsync: " << flush;
- tpct.testRecoverPreparedThenCommitted(true);
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(RecoverPreparedThenAbortedAsync)
-{
- cout << test_filename << ".RecoverPreparedThenAbortedAsync: " << flush;
- tpct.testRecoverPreparedThenAborted(true);
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-26 21:39:59 UTC (rev 2169)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-30 13:04:03 UTC (rev 2170)
@@ -22,7 +22,7 @@
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
-class AsyncFlowToDiskTests(TestBase010):
+class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
def test_01_simple_max_count_transient(self):
16 years, 5 months
rhmessaging commits: r2169 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-06-26 17:39:59 -0400 (Thu, 26 Jun 2008)
New Revision: 2169
Modified:
store/trunk/cpp/lib/StorePlugin.cpp
Log:
Updated for Qpid plugin framework change. Single PluginFactory creates per-target Plugin instances.
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2008-06-26 18:36:08 UTC (rev 2168)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2008-06-26 21:39:59 UTC (rev 2169)
@@ -32,46 +32,42 @@
namespace broker {
using namespace std;
+using rhm::bdbstore::BdbMessageStore;
+struct StorePlugin : public PluginT<Broker> {
+ BdbMessageStore* store;
-struct StorePlugin : public Plugin {
+ StorePlugin(BdbMessageStore* s) : store(s) {}
+ void initializeT(Broker& broker) {
+ store->initManagement(&broker);
+ }
+};
+
+struct StorePluginFactory : public Plugin::FactoryT<Broker> {
rhm::bdbstore::BdbMessageStore::Options options;
- MessageStore *store;
Options* getOptions() { return &options; }
- void earlyInitialize (Plugin::Target& target)
- {
- Broker* broker = dynamic_cast<Broker*>(&target);
- store = new rhm::bdbstore::BdbMessageStore ();
- DataDir& dataDir = broker->getDataDir ();
-
- if (options.storeDir.empty ())
- {
+ boost::shared_ptr<Plugin> createT(Broker& broker) {
+ std::auto_ptr<BdbMessageStore> store(new BdbMessageStore());
+ if (options.storeDir.empty ()) {
+ DataDir& dataDir = broker.getDataDir ();
if (!dataDir.isEnabled ())
throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
"--store-directory must be present.");
-
options.storeDir = dataDir.getPath ();
}
-
- if (!store->init (&options))
- {
+ if (!store->init (&options)) {
throw Exception("Existing journal found in different bdb/async mode. "
"Move or delete existing data files before changing modes, or use "
"'--store-force yes' to discard existing data.");
}
-
- broker->setStore (store);
+ BdbMessageStore* storep=store.release();
+ broker.setStore(storep);
+ return make_shared_ptr(new StorePlugin(storep));
}
-
- void initialize(Plugin::Target& target)
- {
- Broker* broker = dynamic_cast<Broker*>(&target);
- ((rhm::bdbstore::BdbMessageStore*) store)->initManagement (broker);
- }
};
-static StorePlugin instance; // Static initialization.
+static StorePluginFactory instance; // Static initialization.
}} // namespace qpid::broker
16 years, 6 months
rhmessaging commits: r2168 - mgmt/trunk.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-26 14:36:08 -0400 (Thu, 26 Jun 2008)
New Revision: 2168
Modified:
mgmt/trunk/README
Log:
Add simpler pg setup instructions
Modified: mgmt/trunk/README
===================================================================
--- mgmt/trunk/README 2008-06-26 18:24:11 UTC (rev 2167)
+++ mgmt/trunk/README 2008-06-26 18:36:08 UTC (rev 2168)
@@ -76,6 +76,14 @@
[Add the following line, *before* the other similar lines]
+ host cumin cumin 127.0.0.1/32 trust
+
+Alternative postgresql permissions:
+
+ $ vi /var/lib/pgsql/data/pg_hba.conf
+
+ [Add the following line, *before* the other similar lines]
+
host cumin cumin 127.0.0.1/32 ident cumin
$ vi /var/lib/pgsql/data/pg_ident.conf
16 years, 6 months
rhmessaging commits: r2167 - in mgmt: trunk and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-26 14:24:11 -0400 (Thu, 26 Jun 2008)
New Revision: 2167
Added:
mgmt/trunk/
mgmt/trunk/COPYING
mgmt/trunk/LICENSE
mgmt/trunk/Makefile
mgmt/trunk/README
mgmt/trunk/basil/
mgmt/trunk/bin/
mgmt/trunk/cumin-test-0/
mgmt/trunk/cumin.spec
mgmt/trunk/cumin/
mgmt/trunk/etc/
mgmt/trunk/lib/
mgmt/trunk/mint/
mgmt/trunk/misc/
mgmt/trunk/mrg-management.spec
mgmt/trunk/notes/
Removed:
mgmt/COPYING
mgmt/LICENSE
mgmt/Makefile
mgmt/README
mgmt/basil/
mgmt/bin/
mgmt/cumin-test-0/
mgmt/cumin.spec
mgmt/cumin/
mgmt/etc/
mgmt/lib/
mgmt/mint/
mgmt/misc/
mgmt/mrg-management.spec
mgmt/notes/
Log:
Move the mgmt trunk under a "trunk" directory.
Deleted: mgmt/COPYING
===================================================================
--- mgmt/COPYING 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/COPYING 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,15 +0,0 @@
-Copyright (C) 2007 Red Hat Inc.
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; either version 2 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License along
-with this program; if not, write to the Free Software Foundation, Inc.,
-51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Deleted: mgmt/LICENSE
===================================================================
--- mgmt/LICENSE 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/LICENSE 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,280 +0,0 @@
- GNU GENERAL PUBLIC LICENSE
- Version 2, June 1991
-
-Copyright (C) 1989, 1991 Free Software Foundation, Inc.,
-51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-Everyone is permitted to copy and distribute verbatim copies
-of this license document, but changing it is not allowed.
-
- Preamble
-
- The licenses for most software are designed to take away your
-freedom to share and change it. By contrast, the GNU General Public
-License is intended to guarantee your freedom to share and change free
-software--to make sure the software is free for all its users. This
-General Public License applies to most of the Free Software
-Foundation's software and to any other program whose authors commit to
-using it. (Some other Free Software Foundation software is covered by
-the GNU Lesser General Public License instead.) You can apply it to
-your programs, too.
-
- When we speak of free software, we are referring to freedom, not
-price. Our General Public Licenses are designed to make sure that you
-have the freedom to distribute copies of free software (and charge for
-this service if you wish), that you receive source code or can get it
-if you want it, that you can change the software or use pieces of it
-in new free programs; and that you know you can do these things.
-
- To protect your rights, we need to make restrictions that forbid
-anyone to deny you these rights or to ask you to surrender the rights.
-These restrictions translate to certain responsibilities for you if you
-distribute copies of the software, or if you modify it.
-
- For example, if you distribute copies of such a program, whether
-gratis or for a fee, you must give the recipients all the rights that
-you have. You must make sure that they, too, receive or can get the
-source code. And you must show them these terms so they know their
-rights.
-
- We protect your rights with two steps: (1) copyright the software, and
-(2) offer you this license which gives you legal permission to copy,
-distribute and/or modify the software.
-
- Also, for each author's protection and ours, we want to make certain
-that everyone understands that there is no warranty for this free
-software. If the software is modified by someone else and passed on, we
-want its recipients to know that what they have is not the original, so
-that any problems introduced by others will not reflect on the original
-authors' reputations.
-
- Finally, any free program is threatened constantly by software
-patents. We wish to avoid the danger that redistributors of a free
-program will individually obtain patent licenses, in effect making the
-program proprietary. To prevent this, we have made it clear that any
-patent must be licensed for everyone's free use or not licensed at all.
-
- The precise terms and conditions for copying, distribution and
-modification follow.
-
- GNU GENERAL PUBLIC LICENSE
- TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
-
- 0. This License applies to any program or other work which contains
-a notice placed by the copyright holder saying it may be distributed
-under the terms of this General Public License. The "Program", below,
-refers to any such program or work, and a "work based on the Program"
-means either the Program or any derivative work under copyright law:
-that is to say, a work containing the Program or a portion of it,
-either verbatim or with modifications and/or translated into another
-language. (Hereinafter, translation is included without limitation in
-the term "modification".) Each licensee is addressed as "you".
-
-Activities other than copying, distribution and modification are not
-covered by this License; they are outside its scope. The act of
-running the Program is not restricted, and the output from the Program
-is covered only if its contents constitute a work based on the
-Program (independent of having been made by running the Program).
-Whether that is true depends on what the Program does.
-
- 1. You may copy and distribute verbatim copies of the Program's
-source code as you receive it, in any medium, provided that you
-conspicuously and appropriately publish on each copy an appropriate
-copyright notice and disclaimer of warranty; keep intact all the
-notices that refer to this License and to the absence of any warranty;
-and give any other recipients of the Program a copy of this License
-along with the Program.
-
-You may charge a fee for the physical act of transferring a copy, and
-you may at your option offer warranty protection in exchange for a fee.
-
- 2. You may modify your copy or copies of the Program or any portion
-of it, thus forming a work based on the Program, and copy and
-distribute such modifications or work under the terms of Section 1
-above, provided that you also meet all of these conditions:
-
- a) You must cause the modified files to carry prominent notices
- stating that you changed the files and the date of any change.
-
- b) You must cause any work that you distribute or publish, that in
- whole or in part contains or is derived from the Program or any
- part thereof, to be licensed as a whole at no charge to all third
- parties under the terms of this License.
-
- c) If the modified program normally reads commands interactively
- when run, you must cause it, when started running for such
- interactive use in the most ordinary way, to print or display an
- announcement including an appropriate copyright notice and a
- notice that there is no warranty (or else, saying that you provide
- a warranty) and that users may redistribute the program under
- these conditions, and telling the user how to view a copy of this
- License. (Exception: if the Program itself is interactive but
- does not normally print such an announcement, your work based on
- the Program is not required to print an announcement.)
-
-These requirements apply to the modified work as a whole. If
-identifiable sections of that work are not derived from the Program,
-and can be reasonably considered independent and separate works in
-themselves, then this License, and its terms, do not apply to those
-sections when you distribute them as separate works. But when you
-distribute the same sections as part of a whole which is a work based
-on the Program, the distribution of the whole must be on the terms of
-this License, whose permissions for other licensees extend to the
-entire whole, and thus to each and every part regardless of who wrote it.
-
-Thus, it is not the intent of this section to claim rights or contest
-your rights to work written entirely by you; rather, the intent is to
-exercise the right to control the distribution of derivative or
-collective works based on the Program.
-
-In addition, mere aggregation of another work not based on the Program
-with the Program (or with a work based on the Program) on a volume of
-a storage or distribution medium does not bring the other work under
-the scope of this License.
-
- 3. You may copy and distribute the Program (or a work based on it,
-under Section 2) in object code or executable form under the terms of
-Sections 1 and 2 above provided that you also do one of the following:
-
- a) Accompany it with the complete corresponding machine-readable
- source code, which must be distributed under the terms of Sections
- 1 and 2 above on a medium customarily used for software interchange; or,
-
- b) Accompany it with a written offer, valid for at least three
- years, to give any third party, for a charge no more than your
- cost of physically performing source distribution, a complete
- machine-readable copy of the corresponding source code, to be
- distributed under the terms of Sections 1 and 2 above on a medium
- customarily used for software interchange; or,
-
- c) Accompany it with the information you received as to the offer
- to distribute corresponding source code. (This alternative is
- allowed only for noncommercial distribution and only if you
- received the program in object code or executable form with such
- an offer, in accord with Subsection b above.)
-
-The source code for a work means the preferred form of the work for
-making modifications to it. For an executable work, complete source
-code means all the source code for all modules it contains, plus any
-associated interface definition files, plus the scripts used to
-control compilation and installation of the executable. However, as a
-special exception, the source code distributed need not include
-anything that is normally distributed (in either source or binary
-form) with the major components (compiler, kernel, and so on) of the
-operating system on which the executable runs, unless that component
-itself accompanies the executable.
-
-If distribution of executable or object code is made by offering
-access to copy from a designated place, then offering equivalent
-access to copy the source code from the same place counts as
-distribution of the source code, even though third parties are not
-compelled to copy the source along with the object code.
-
- 4. You may not copy, modify, sublicense, or distribute the Program
-except as expressly provided under this License. Any attempt
-otherwise to copy, modify, sublicense or distribute the Program is
-void, and will automatically terminate your rights under this License.
-However, parties who have received copies, or rights, from you under
-this License will not have their licenses terminated so long as such
-parties remain in full compliance.
-
- 5. You are not required to accept this License, since you have not
-signed it. However, nothing else grants you permission to modify or
-distribute the Program or its derivative works. These actions are
-prohibited by law if you do not accept this License. Therefore, by
-modifying or distributing the Program (or any work based on the
-Program), you indicate your acceptance of this License to do so, and
-all its terms and conditions for copying, distributing or modifying
-the Program or works based on it.
-
- 6. Each time you redistribute the Program (or any work based on the
-Program), the recipient automatically receives a license from the
-original licensor to copy, distribute or modify the Program subject to
-these terms and conditions. You may not impose any further
-restrictions on the recipients' exercise of the rights granted herein.
-You are not responsible for enforcing compliance by third parties to
-this License.
-
- 7. If, as a consequence of a court judgment or allegation of patent
-infringement or for any other reason (not limited to patent issues),
-conditions are imposed on you (whether by court order, agreement or
-otherwise) that contradict the conditions of this License, they do not
-excuse you from the conditions of this License. If you cannot
-distribute so as to satisfy simultaneously your obligations under this
-License and any other pertinent obligations, then as a consequence you
-may not distribute the Program at all. For example, if a patent
-license would not permit royalty-free redistribution of the Program by
-all those who receive copies directly or indirectly through you, then
-the only way you could satisfy both it and this License would be to
-refrain entirely from distribution of the Program.
-
-If any portion of this section is held invalid or unenforceable under
-any particular circumstance, the balance of the section is intended to
-apply and the section as a whole is intended to apply in other
-circumstances.
-
-It is not the purpose of this section to induce you to infringe any
-patents or other property right claims or to contest validity of any
-such claims; this section has the sole purpose of protecting the
-integrity of the free software distribution system, which is
-implemented by public license practices. Many people have made
-generous contributions to the wide range of software distributed
-through that system in reliance on consistent application of that
-system; it is up to the author/donor to decide if he or she is willing
-to distribute software through any other system and a licensee cannot
-impose that choice.
-
-This section is intended to make thoroughly clear what is believed to
-be a consequence of the rest of this License.
-
- 8. If the distribution and/or use of the Program is restricted in
-certain countries either by patents or by copyrighted interfaces, the
-original copyright holder who places the Program under this License
-may add an explicit geographical distribution limitation excluding
-those countries, so that distribution is permitted only in or among
-countries not thus excluded. In such case, this License incorporates
-the limitation as if written in the body of this License.
-
- 9. The Free Software Foundation may publish revised and/or new versions
-of the General Public License from time to time. Such new versions will
-be similar in spirit to the present version, but may differ in detail to
-address new problems or concerns.
-
-Each version is given a distinguishing version number. If the Program
-specifies a version number of this License which applies to it and "any
-later version", you have the option of following the terms and conditions
-either of that version or of any later version published by the Free
-Software Foundation. If the Program does not specify a version number of
-this License, you may choose any version ever published by the Free Software
-Foundation.
-
- 10. If you wish to incorporate parts of the Program into other free
-programs whose distribution conditions are different, write to the author
-to ask for permission. For software which is copyrighted by the Free
-Software Foundation, write to the Free Software Foundation; we sometimes
-make exceptions for this. Our decision will be guided by the two goals
-of preserving the free status of all derivatives of our free software and
-of promoting the sharing and reuse of software generally.
-
- NO WARRANTY
-
- 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
-FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
-OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
-PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
-OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
-TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
-PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
-REPAIR OR CORRECTION.
-
- 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
-WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
-REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
-INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
-OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
-TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
-YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
-PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
-POSSIBILITY OF SUCH DAMAGES.
-
- END OF TERMS AND CONDITIONS
Deleted: mgmt/Makefile
===================================================================
--- mgmt/Makefile 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/Makefile 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,54 +0,0 @@
-.PHONY: help dist clean cumin mint tags check-devel-env
-
-version := 0.1.$(shell svn info | fgrep "Revision:" | cut -d " " -f 2)
-
-help:
- @echo "Targets:"
- @echo " help Print this message"
- @echo " tags Rebuild the tag index"
- @echo " dist Create a dist tarball"
- @echo " cumin Install cumin"
- @echo " mint Install mint"
- @echo " clean Remove software installed at $$DEVEL_HOME/install"
-
-dist: clean
- mkdir -p dist/lib/python/mint
- mkdir -p dist/lib/python/wooly
- mkdir -p dist/lib/python/cumin
- mkdir -p dist/bin
- mkdir -p dist/resources
- mkdir -p dist/etc
- mkdir -p dist/sql
- mkdir -p dist/doc
- mkdir -p dist/log
- cp -a mint/python/mint/* dist/lib/python/mint
- cp -a mint/sql/* dist/sql
- cp -a cumin/python/wooly/* dist/lib/python/wooly
- cp -a cumin/python/cumin/* dist/lib/python/cumin
- cp -a cumin/bin/* dist/bin
- cp -a cumin/resources/* dist/resources
- cp -a cumin/etc/* dist/etc
- cp LICENSE COPYING dist/doc
- tar -cvzf cumin-${version}.tar.gz dist
-
-source-tarball:
- svn export . /tmp/cumin-${version}
- tar -C /tmp -czf cumin-${version}.tar.gz cumin-${version}
- rm -rf /tmp/cumin-${version}
-
-clean: check-devel-env
- rm -rf dist
- rm -rf "${DEVEL_HOME}"/install
-
-tags: check-devel-env
- find "${DEVEL_HOME}" -name \*.py -print \
- | etags --output="${DEVEL_HOME}/etc/devel.tags" -
- find "${DEVEL_HOME}" -name \*.strings -print \
- | etags --append --output="${DEVEL_HOME}/etc/devel.tags" \
- --regex='/^\[.*\][ \t]*$$/\1/' -
-
-check-devel-env:
- @if test -z "${DEVEL_HOME}"; then \
- echo "DEVEL_HOME is not set; you need to source etc/devel.profile"; \
- exit 1; \
- fi
Deleted: mgmt/README
===================================================================
--- mgmt/README 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/README 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,153 +0,0 @@
-This is the development environment for the Red Hat Messaging
-management suite (in this document, "mgmt" for short).
-
-These instructions assume you have sudo installed. If not, you can
-install it (as shown below) or you can instead su to root.
-
-To install sudo:
-
- $ su -
- $ yum install sudo
- $ visudo # Add your user
-
-DEPENDENCIES
-------------
-
-To run the management code, you need the following packages (on top of
-what you get in a typical Fedora install):
-
- postgresql-server
- python-sqlobject
- python-psycopg2
-
- $ sudo yum install postgresql-server python-sqlobject python-psycopg2
-
-It also depends on the qpid python code. You can satisfy these
-dependencies either by installing the python-qpid package, or by
-checking out the qpid python code. The latter is currently the better
-option, because of mgmt depending on newer features of the qpid python
-code.
-
-Check out qpid python from source and put it in the PYTHONPATH. The
-method below links to the qpid code via symlinks in ~/lib/python,
-which is in the mgmt devel environment's PYTHONPATH:
-
- $ svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/python ~/pyqpid
- $ mkdir -p ~/lib/python
- $ cd ~/lib/python
- $ ln -s ~/pyqpid/qpid
- $ ln -s ~/pyqpid/mllib
-
-*Alternatively*, install python-qpid:
-
- $ sudo yum install python-qpid
-
-PREPARING THE DEVEL ENVIRONMENT
--------------------------------
-
-Change to the mgmt directory (the one containing this README file),
-and source the devel environment settings:
-
- $ cd mgmt
- $ source etc/devel.profile # Or use etc/devel.profile.tcsh
-
-Check that everything is set up properly:
-
- $ which cumin
- ~/mgmt/cumin/bin/cumin
- $ echo $DEVEL_HOME
- /home/youruser/mgmt
-
-PREPARING THE DATABASE
-----------------------
-
-You will also need to create a database and load the schema. If you
-haven't already done it, you'll need to initialize the postgres
-service, edit permissions, and start it up.
-
-Initialize the postgresql data files:
-
- $ sudo su - postgres # Now you're the postgres user
- $ initdb -D /var/lib/pgsql/data
-
-Edit postgresql permissions:
-
- $ vi /var/lib/pgsql/data/pg_hba.conf
-
- [Add the following line, *before* the other similar lines]
-
- host cumin cumin 127.0.0.1/32 ident cumin
-
- $ vi /var/lib/pgsql/data/pg_ident.conf
-
- [Add the following lines at the bottom, substituting your user
- name for "youruser"]
-
- cumin youruser cumin
- cumin root cumin
-
-Start the postgresql service:
-
- $ exit # Back to your own user
- $ sudo /sbin/service postgresql start
- Starting postgresql service: [ OK ]
-
-Now you can create a database. First you have to switch to the
-postgres user, and then you can use the create* scripts.
-
-Create the postgresql database:
-
- $ sudo su - postgres # Become the postgres user again
- $ createuser --superuser cumin
- CREATE ROLE
- $ createdb --owner=cumin cumin
- CREATE DATABASE
- $ exit # Leave the postgres user
-
-At this point you should have a working database. Test it using psql:
-
- $ psql -d cumin -U cumin -h localhost
- Welcome to psql 8.2.7, the PostgreSQL interactive terminal.
- [...]
- cumin=# # Type \q to get out
-
-Now you can load the scheme definition.
-
- $ cumin-admin create-schema
- Executed 100 statements from file '/home/jross/checkouts/mgmt/cumin-test-0/sql/schema.sql'
- Executed 6 statements from file '/home/jross/checkouts/mgmt/cumin-test-0/sql/indexes.sql'
-
-At this point you should have a working database and schema that you
-can connect to at postgresql://exampleuser@localhost/exampledb. All
-that remains is to add a cumin user:
-
-Add a cumin user:
-
- $ cumin-admin add-user guest
- Set password: # Enter a password for guest
- Retype password: # Confirm said password
- User 'guest' is added
-
-
-USING THE DEVEL ENVIRONMENT
----------------------------
-
-For your convenience, there is a script, bin/devel, which you can use
-to start up the devel environment. I recommend putting a small
-wrapper script like that below somewhere in your path:
-
- $ cat ~/bin/mgmt
- #!/bin/bash
-
- export DEVEL_HOME="${HOME}/mgmt"
-
- exec "${DEVEL_HOME}/bin/devel"
-
-
-SOME GOTCHAS YOU MIGHT RUN INTO
--------------------------------
-
-1. PostgreSQL "sameuser ident" authentication
-
- If you get an error about failed ident authentication, make sure
- you have an ident server installed and running.
Deleted: mgmt/cumin.spec
===================================================================
--- mgmt/cumin.spec 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/cumin.spec 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,109 +0,0 @@
-# svn revision: $Rev$
-
-Summary: management component of MRG
-Name: cumin
-Version: 0.1
-Release: 6%{?dist}
-License: LGPL
-Group: System Environment/Libraries
-URL: http://redhat.com/mrg
-Source0: %{name}-%{version}.tar.gz
-# svn co http://anonsvn.jboss.org/repos/rhmessaging/mgmt mgmt
-# cd mgmt; make dist
-# tar -cvzhf cumin-0.1.tar.gz dist
-BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
-BuildArch: noarch
-
-BuildRequires: python >= 2.4
-BuildRequires: python-devel >= 2.4
-
-Requires: python >= 2.4
-Requires: postgresql >= 8.1.9
-Requires: postgresql-server >= 8.1.9
-Requires: python-sqlobject >= 0.9.2
-Requires: python-psycopg2 >= 2.0.6
-Requires: python-qpid
-
-%description
-Cumin is the management component of MRG - Messaging, Realtime and Grid.
-Provides a unified management interface for the Messaging, Realtime and Grid
-components of MRG.
-
-%{!?python_sitelib: %define python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
-
-%pre
-getent group cumin >/dev/null || groupadd -r cumin
-getent passwd cumin >/dev/null || \
- useradd -r -m -g cumin -d %{_datadir}/cumin -s /sbin/nologin \
- -c "Owner of Cumin Daemons" cumin
-exit 0
-
-%prep
-%setup -q -n dist
-
-%build
-#empty
-
-%install
-rm -rf $RPM_BUILD_ROOT
-CUMIN_HOME=%{_datadir}/cumin
-install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}
-
-install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/resources
-install -pm 0644 resources/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/resources
-
-install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/sql
-install -pm 0644 sql/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/sql
-
-install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/doc
-install -pm 0644 doc/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/doc
-
-install -d ${RPM_BUILD_ROOT}%{_bindir}
-install -pm 0755 bin/* $RPM_BUILD_ROOT%{_bindir}
-
-install -d ${RPM_BUILD_ROOT}%{_sysconfdir}
-install -pm 0644 etc/* $RPM_BUILD_ROOT%{_sysconfdir}
-ln -s %{_sysconfdir} ${RPM_BUILD_ROOT}${CUMIN_HOME}/etc
-
-install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/lib/python
-cd lib/python
-for dir in cumin mint wooly; do
- install -d $RPM_BUILD_ROOT%{python_sitelib}/$dir
- install -pm 0644 $dir/* $RPM_BUILD_ROOT%{python_sitelib}/$dir
- ln -s %{python_sitelib}/$dir ${RPM_BUILD_ROOT}${CUMIN_HOME}/lib/python/$dir
-done
-cd ../..
-
-%clean
-rm -rf $RPM_BUILD_ROOT
-
-%files
-%defattr(-,cumin,cumin,-)
-%doc doc/*
-%{_bindir}/cumin*
-%{_sysconfdir}/*cumin*
-%{_datadir}/cumin
-%{python_sitelib}/cumin
-%{python_sitelib}/mint
-%{python_sitelib}/wooly
-
-
-%changelog
-* Mon Mar 31 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-6
-- Create cumin user/group
-
-* Mon Feb 11 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-5
-- Fix for multiple broker registrations
-
-* Mon Feb 11 2008 Rafael Schloming <rafaels(a)redhat.com> - 0.1-4
-- Bumped for Beta 3 update
-
-* Fri Jan 25 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-3
-- Workaround to fix charts issue
-
-* Thu Jan 24 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-2
-- Fix data url in /etc/cumin.conf; work around query divide-by-zero issues
-- Add dependency on postgresql-server
-
-* Fri Jan 18 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-1
-- Initial build.
Deleted: mgmt/mrg-management.spec
===================================================================
--- mgmt/mrg-management.spec 2008-06-26 14:34:56 UTC (rev 2166)
+++ mgmt/mrg-management.spec 2008-06-26 18:24:11 UTC (rev 2167)
@@ -1,37 +0,0 @@
-Summary: MRG - management component
-Name: mrg-management
-Version: 1.0
-Release: 2%{?dist}
-License: LGPL
-Group: System Environment/Libraries
-URL: http://redhat.com/mrg
-BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
-BuildArch: noarch
-
-Requires: cumin
-
-
-%description
-This is the top-level package that includes the MRG management component.
-
-%install
-rm -rf %{buildroot}
-
-%clean
-rm -rf $RPM_BUILD_ROOT
-
-%prep
-
-%build
-
-%files
-%defattr(-,root,root,-)
-
-
-%changelog
-* Mon Feb 11 2008 Rafael Schloming <rafaels(a)redhat.com> - 1.0-2
-- Bump for Beta 3 update
-
-* Fri Jan 18 2008 Nuno Santos <nsantos(a)redhat.com> - 1.0-1
-- Initial build.
-
Copied: mgmt/trunk/COPYING (from rev 2166, mgmt/COPYING)
===================================================================
--- mgmt/trunk/COPYING (rev 0)
+++ mgmt/trunk/COPYING 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,15 @@
+Copyright (C) 2007 Red Hat Inc.
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 2 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Copied: mgmt/trunk/LICENSE (from rev 2166, mgmt/LICENSE)
===================================================================
--- mgmt/trunk/LICENSE (rev 0)
+++ mgmt/trunk/LICENSE 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,280 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 2, June 1991
+
+Copyright (C) 1989, 1991 Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+Everyone is permitted to copy and distribute verbatim copies
+of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+License is intended to guarantee your freedom to share and change free
+software--to make sure the software is free for all its users. This
+General Public License applies to most of the Free Software
+Foundation's software and to any other program whose authors commit to
+using it. (Some other Free Software Foundation software is covered by
+the GNU Lesser General Public License instead.) You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+this service if you wish), that you receive source code or can get it
+if you want it, that you can change the software or use pieces of it
+in new free programs; and that you know you can do these things.
+
+ To protect your rights, we need to make restrictions that forbid
+anyone to deny you these rights or to ask you to surrender the rights.
+These restrictions translate to certain responsibilities for you if you
+distribute copies of the software, or if you modify it.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must give the recipients all the rights that
+you have. You must make sure that they, too, receive or can get the
+source code. And you must show them these terms so they know their
+rights.
+
+ We protect your rights with two steps: (1) copyright the software, and
+(2) offer you this license which gives you legal permission to copy,
+distribute and/or modify the software.
+
+ Also, for each author's protection and ours, we want to make certain
+that everyone understands that there is no warranty for this free
+software. If the software is modified by someone else and passed on, we
+want its recipients to know that what they have is not the original, so
+that any problems introduced by others will not reflect on the original
+authors' reputations.
+
+ Finally, any free program is threatened constantly by software
+patents. We wish to avoid the danger that redistributors of a free
+program will individually obtain patent licenses, in effect making the
+program proprietary. To prevent this, we have made it clear that any
+patent must be licensed for everyone's free use or not licensed at all.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ GNU GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License applies to any program or other work which contains
+a notice placed by the copyright holder saying it may be distributed
+under the terms of this General Public License. The "Program", below,
+refers to any such program or work, and a "work based on the Program"
+means either the Program or any derivative work under copyright law:
+that is to say, a work containing the Program or a portion of it,
+either verbatim or with modifications and/or translated into another
+language. (Hereinafter, translation is included without limitation in
+the term "modification".) Each licensee is addressed as "you".
+
+Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running the Program is not restricted, and the output from the Program
+is covered only if its contents constitute a work based on the
+Program (independent of having been made by running the Program).
+Whether that is true depends on what the Program does.
+
+ 1. You may copy and distribute verbatim copies of the Program's
+source code as you receive it, in any medium, provided that you
+conspicuously and appropriately publish on each copy an appropriate
+copyright notice and disclaimer of warranty; keep intact all the
+notices that refer to this License and to the absence of any warranty;
+and give any other recipients of the Program a copy of this License
+along with the Program.
+
+You may charge a fee for the physical act of transferring a copy, and
+you may at your option offer warranty protection in exchange for a fee.
+
+ 2. You may modify your copy or copies of the Program or any portion
+of it, thus forming a work based on the Program, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) You must cause the modified files to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ b) You must cause any work that you distribute or publish, that in
+ whole or in part contains or is derived from the Program or any
+ part thereof, to be licensed as a whole at no charge to all third
+ parties under the terms of this License.
+
+ c) If the modified program normally reads commands interactively
+ when run, you must cause it, when started running for such
+ interactive use in the most ordinary way, to print or display an
+ announcement including an appropriate copyright notice and a
+ notice that there is no warranty (or else, saying that you provide
+ a warranty) and that users may redistribute the program under
+ these conditions, and telling the user how to view a copy of this
+ License. (Exception: if the Program itself is interactive but
+ does not normally print such an announcement, your work based on
+ the Program is not required to print an announcement.)
+
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Program,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Program, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Program.
+
+In addition, mere aggregation of another work not based on the Program
+with the Program (or with a work based on the Program) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may copy and distribute the Program (or a work based on it,
+under Section 2) in object code or executable form under the terms of
+Sections 1 and 2 above provided that you also do one of the following:
+
+ a) Accompany it with the complete corresponding machine-readable
+ source code, which must be distributed under the terms of Sections
+ 1 and 2 above on a medium customarily used for software interchange; or,
+
+ b) Accompany it with a written offer, valid for at least three
+ years, to give any third party, for a charge no more than your
+ cost of physically performing source distribution, a complete
+ machine-readable copy of the corresponding source code, to be
+ distributed under the terms of Sections 1 and 2 above on a medium
+ customarily used for software interchange; or,
+
+ c) Accompany it with the information you received as to the offer
+ to distribute corresponding source code. (This alternative is
+ allowed only for noncommercial distribution and only if you
+ received the program in object code or executable form with such
+ an offer, in accord with Subsection b above.)
+
+The source code for a work means the preferred form of the work for
+making modifications to it. For an executable work, complete source
+code means all the source code for all modules it contains, plus any
+associated interface definition files, plus the scripts used to
+control compilation and installation of the executable. However, as a
+special exception, the source code distributed need not include
+anything that is normally distributed (in either source or binary
+form) with the major components (compiler, kernel, and so on) of the
+operating system on which the executable runs, unless that component
+itself accompanies the executable.
+
+If distribution of executable or object code is made by offering
+access to copy from a designated place, then offering equivalent
+access to copy the source code from the same place counts as
+distribution of the source code, even though third parties are not
+compelled to copy the source along with the object code.
+
+ 4. You may not copy, modify, sublicense, or distribute the Program
+except as expressly provided under this License. Any attempt
+otherwise to copy, modify, sublicense or distribute the Program is
+void, and will automatically terminate your rights under this License.
+However, parties who have received copies, or rights, from you under
+this License will not have their licenses terminated so long as such
+parties remain in full compliance.
+
+ 5. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Program or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Program (or any work based on the
+Program), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Program or works based on it.
+
+ 6. Each time you redistribute the Program (or any work based on the
+Program), the recipient automatically receives a license from the
+original licensor to copy, distribute or modify the Program subject to
+these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties to
+this License.
+
+ 7. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Program at all. For example, if a patent
+license would not permit royalty-free redistribution of the Program by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Program.
+
+If any portion of this section is held invalid or unenforceable under
+any particular circumstance, the balance of the section is intended to
+apply and the section as a whole is intended to apply in other
+circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system, which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+
+ 8. If the distribution and/or use of the Program is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Program under this License
+may add an explicit geographical distribution limitation excluding
+those countries, so that distribution is permitted only in or among
+countries not thus excluded. In such case, this License incorporates
+the limitation as if written in the body of this License.
+
+ 9. The Free Software Foundation may publish revised and/or new versions
+of the General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Program
+specifies a version number of this License which applies to it and "any
+later version", you have the option of following the terms and conditions
+either of that version or of any later version published by the Free
+Software Foundation. If the Program does not specify a version number of
+this License, you may choose any version ever published by the Free Software
+Foundation.
+
+ 10. If you wish to incorporate parts of the Program into other free
+programs whose distribution conditions are different, write to the author
+to ask for permission. For software which is copyrighted by the Free
+Software Foundation, write to the Free Software Foundation; we sometimes
+make exceptions for this. Our decision will be guided by the two goals
+of preserving the free status of all derivatives of our free software and
+of promoting the sharing and reuse of software generally.
+
+ NO WARRANTY
+
+ 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
+FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
+OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
+PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
+OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
+TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
+PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
+REPAIR OR CORRECTION.
+
+ 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
+REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
+INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
+OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
+TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
+YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
+PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+ END OF TERMS AND CONDITIONS
Copied: mgmt/trunk/Makefile (from rev 2166, mgmt/Makefile)
===================================================================
--- mgmt/trunk/Makefile (rev 0)
+++ mgmt/trunk/Makefile 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,54 @@
+.PHONY: help dist clean cumin mint tags check-devel-env
+
+version := 0.1.$(shell svn info | fgrep "Revision:" | cut -d " " -f 2)
+
+help:
+ @echo "Targets:"
+ @echo " help Print this message"
+ @echo " tags Rebuild the tag index"
+ @echo " dist Create a dist tarball"
+ @echo " cumin Install cumin"
+ @echo " mint Install mint"
+ @echo " clean Remove software installed at $$DEVEL_HOME/install"
+
+dist: clean
+ mkdir -p dist/lib/python/mint
+ mkdir -p dist/lib/python/wooly
+ mkdir -p dist/lib/python/cumin
+ mkdir -p dist/bin
+ mkdir -p dist/resources
+ mkdir -p dist/etc
+ mkdir -p dist/sql
+ mkdir -p dist/doc
+ mkdir -p dist/log
+ cp -a mint/python/mint/* dist/lib/python/mint
+ cp -a mint/sql/* dist/sql
+ cp -a cumin/python/wooly/* dist/lib/python/wooly
+ cp -a cumin/python/cumin/* dist/lib/python/cumin
+ cp -a cumin/bin/* dist/bin
+ cp -a cumin/resources/* dist/resources
+ cp -a cumin/etc/* dist/etc
+ cp LICENSE COPYING dist/doc
+ tar -cvzf cumin-${version}.tar.gz dist
+
+source-tarball:
+ svn export . /tmp/cumin-${version}
+ tar -C /tmp -czf cumin-${version}.tar.gz cumin-${version}
+ rm -rf /tmp/cumin-${version}
+
+clean: check-devel-env
+ rm -rf dist
+ rm -rf "${DEVEL_HOME}"/install
+
+tags: check-devel-env
+ find "${DEVEL_HOME}" -name \*.py -print \
+ | etags --output="${DEVEL_HOME}/etc/devel.tags" -
+ find "${DEVEL_HOME}" -name \*.strings -print \
+ | etags --append --output="${DEVEL_HOME}/etc/devel.tags" \
+ --regex='/^\[.*\][ \t]*$$/\1/' -
+
+check-devel-env:
+ @if test -z "${DEVEL_HOME}"; then \
+ echo "DEVEL_HOME is not set; you need to source etc/devel.profile"; \
+ exit 1; \
+ fi
Copied: mgmt/trunk/README (from rev 2166, mgmt/README)
===================================================================
--- mgmt/trunk/README (rev 0)
+++ mgmt/trunk/README 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,153 @@
+This is the development environment for the Red Hat Messaging
+management suite (in this document, "mgmt" for short).
+
+These instructions assume you have sudo installed. If not, you can
+install it (as shown below) or you can instead su to root.
+
+To install sudo:
+
+ $ su -
+ $ yum install sudo
+ $ visudo # Add your user
+
+DEPENDENCIES
+------------
+
+To run the management code, you need the following packages (on top of
+what you get in a typical Fedora install):
+
+ postgresql-server
+ python-sqlobject
+ python-psycopg2
+
+ $ sudo yum install postgresql-server python-sqlobject python-psycopg2
+
+It also depends on the qpid python code. You can satisfy these
+dependencies either by installing the python-qpid package, or by
+checking out the qpid python code. The latter is currently the better
+option, because of mgmt depending on newer features of the qpid python
+code.
+
+Check out qpid python from source and put it in the PYTHONPATH. The
+method below links to the qpid code via symlinks in ~/lib/python,
+which is in the mgmt devel environment's PYTHONPATH:
+
+ $ svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/python ~/pyqpid
+ $ mkdir -p ~/lib/python
+ $ cd ~/lib/python
+ $ ln -s ~/pyqpid/qpid
+ $ ln -s ~/pyqpid/mllib
+
+*Alternatively*, install python-qpid:
+
+ $ sudo yum install python-qpid
+
+PREPARING THE DEVEL ENVIRONMENT
+-------------------------------
+
+Change to the mgmt directory (the one containing this README file),
+and source the devel environment settings:
+
+ $ cd mgmt
+ $ source etc/devel.profile # Or use etc/devel.profile.tcsh
+
+Check that everything is set up properly:
+
+ $ which cumin
+ ~/mgmt/cumin/bin/cumin
+ $ echo $DEVEL_HOME
+ /home/youruser/mgmt
+
+PREPARING THE DATABASE
+----------------------
+
+You will also need to create a database and load the schema. If you
+haven't already done it, you'll need to initialize the postgres
+service, edit permissions, and start it up.
+
+Initialize the postgresql data files:
+
+ $ sudo su - postgres # Now you're the postgres user
+ $ initdb -D /var/lib/pgsql/data
+
+Edit postgresql permissions:
+
+ $ vi /var/lib/pgsql/data/pg_hba.conf
+
+ [Add the following line, *before* the other similar lines]
+
+ host cumin cumin 127.0.0.1/32 ident cumin
+
+ $ vi /var/lib/pgsql/data/pg_ident.conf
+
+ [Add the following lines at the bottom, substituting your user
+ name for "youruser"]
+
+ cumin youruser cumin
+ cumin root cumin
+
+Start the postgresql service:
+
+ $ exit # Back to your own user
+ $ sudo /sbin/service postgresql start
+ Starting postgresql service: [ OK ]
+
+Now you can create a database. First you have to switch to the
+postgres user, and then you can use the create* scripts.
+
+Create the postgresql database:
+
+ $ sudo su - postgres # Become the postgres user again
+ $ createuser --superuser cumin
+ CREATE ROLE
+ $ createdb --owner=cumin cumin
+ CREATE DATABASE
+ $ exit # Leave the postgres user
+
+At this point you should have a working database. Test it using psql:
+
+ $ psql -d cumin -U cumin -h localhost
+ Welcome to psql 8.2.7, the PostgreSQL interactive terminal.
+ [...]
+ cumin=# # Type \q to get out
+
+Now you can load the scheme definition.
+
+ $ cumin-admin create-schema
+ Executed 100 statements from file '/home/jross/checkouts/mgmt/cumin-test-0/sql/schema.sql'
+ Executed 6 statements from file '/home/jross/checkouts/mgmt/cumin-test-0/sql/indexes.sql'
+
+At this point you should have a working database and schema that you
+can connect to at postgresql://exampleuser@localhost/exampledb. All
+that remains is to add a cumin user:
+
+Add a cumin user:
+
+ $ cumin-admin add-user guest
+ Set password: # Enter a password for guest
+ Retype password: # Confirm said password
+ User 'guest' is added
+
+
+USING THE DEVEL ENVIRONMENT
+---------------------------
+
+For your convenience, there is a script, bin/devel, which you can use
+to start up the devel environment. I recommend putting a small
+wrapper script like that below somewhere in your path:
+
+ $ cat ~/bin/mgmt
+ #!/bin/bash
+
+ export DEVEL_HOME="${HOME}/mgmt"
+
+ exec "${DEVEL_HOME}/bin/devel"
+
+
+SOME GOTCHAS YOU MIGHT RUN INTO
+-------------------------------
+
+1. PostgreSQL "sameuser ident" authentication
+
+ If you get an error about failed ident authentication, make sure
+ you have an ident server installed and running.
Copied: mgmt/trunk/basil (from rev 2166, mgmt/basil)
Copied: mgmt/trunk/bin (from rev 2166, mgmt/bin)
Copied: mgmt/trunk/cumin (from rev 2166, mgmt/cumin)
Copied: mgmt/trunk/cumin-test-0 (from rev 2166, mgmt/cumin-test-0)
Copied: mgmt/trunk/cumin.spec (from rev 2166, mgmt/cumin.spec)
===================================================================
--- mgmt/trunk/cumin.spec (rev 0)
+++ mgmt/trunk/cumin.spec 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,109 @@
+# svn revision: $Rev$
+
+Summary: management component of MRG
+Name: cumin
+Version: 0.1
+Release: 6%{?dist}
+License: LGPL
+Group: System Environment/Libraries
+URL: http://redhat.com/mrg
+Source0: %{name}-%{version}.tar.gz
+# svn co http://anonsvn.jboss.org/repos/rhmessaging/mgmt mgmt
+# cd mgmt; make dist
+# tar -cvzhf cumin-0.1.tar.gz dist
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
+BuildArch: noarch
+
+BuildRequires: python >= 2.4
+BuildRequires: python-devel >= 2.4
+
+Requires: python >= 2.4
+Requires: postgresql >= 8.1.9
+Requires: postgresql-server >= 8.1.9
+Requires: python-sqlobject >= 0.9.2
+Requires: python-psycopg2 >= 2.0.6
+Requires: python-qpid
+
+%description
+Cumin is the management component of MRG - Messaging, Realtime and Grid.
+Provides a unified management interface for the Messaging, Realtime and Grid
+components of MRG.
+
+%{!?python_sitelib: %define python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
+
+%pre
+getent group cumin >/dev/null || groupadd -r cumin
+getent passwd cumin >/dev/null || \
+ useradd -r -m -g cumin -d %{_datadir}/cumin -s /sbin/nologin \
+ -c "Owner of Cumin Daemons" cumin
+exit 0
+
+%prep
+%setup -q -n dist
+
+%build
+#empty
+
+%install
+rm -rf $RPM_BUILD_ROOT
+CUMIN_HOME=%{_datadir}/cumin
+install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}
+
+install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/resources
+install -pm 0644 resources/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/resources
+
+install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/sql
+install -pm 0644 sql/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/sql
+
+install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/doc
+install -pm 0644 doc/* ${RPM_BUILD_ROOT}${CUMIN_HOME}/doc
+
+install -d ${RPM_BUILD_ROOT}%{_bindir}
+install -pm 0755 bin/* $RPM_BUILD_ROOT%{_bindir}
+
+install -d ${RPM_BUILD_ROOT}%{_sysconfdir}
+install -pm 0644 etc/* $RPM_BUILD_ROOT%{_sysconfdir}
+ln -s %{_sysconfdir} ${RPM_BUILD_ROOT}${CUMIN_HOME}/etc
+
+install -d ${RPM_BUILD_ROOT}${CUMIN_HOME}/lib/python
+cd lib/python
+for dir in cumin mint wooly; do
+ install -d $RPM_BUILD_ROOT%{python_sitelib}/$dir
+ install -pm 0644 $dir/* $RPM_BUILD_ROOT%{python_sitelib}/$dir
+ ln -s %{python_sitelib}/$dir ${RPM_BUILD_ROOT}${CUMIN_HOME}/lib/python/$dir
+done
+cd ../..
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%files
+%defattr(-,cumin,cumin,-)
+%doc doc/*
+%{_bindir}/cumin*
+%{_sysconfdir}/*cumin*
+%{_datadir}/cumin
+%{python_sitelib}/cumin
+%{python_sitelib}/mint
+%{python_sitelib}/wooly
+
+
+%changelog
+* Mon Mar 31 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-6
+- Create cumin user/group
+
+* Mon Feb 11 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-5
+- Fix for multiple broker registrations
+
+* Mon Feb 11 2008 Rafael Schloming <rafaels(a)redhat.com> - 0.1-4
+- Bumped for Beta 3 update
+
+* Fri Jan 25 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-3
+- Workaround to fix charts issue
+
+* Thu Jan 24 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-2
+- Fix data url in /etc/cumin.conf; work around query divide-by-zero issues
+- Add dependency on postgresql-server
+
+* Fri Jan 18 2008 Nuno Santos <nsantos(a)redhat.com> - 0.1-1
+- Initial build.
Copied: mgmt/trunk/etc (from rev 2166, mgmt/etc)
Copied: mgmt/trunk/lib (from rev 2166, mgmt/lib)
Copied: mgmt/trunk/mint (from rev 2166, mgmt/mint)
Copied: mgmt/trunk/misc (from rev 2166, mgmt/misc)
Copied: mgmt/trunk/mrg-management.spec (from rev 2166, mgmt/mrg-management.spec)
===================================================================
--- mgmt/trunk/mrg-management.spec (rev 0)
+++ mgmt/trunk/mrg-management.spec 2008-06-26 18:24:11 UTC (rev 2167)
@@ -0,0 +1,37 @@
+Summary: MRG - management component
+Name: mrg-management
+Version: 1.0
+Release: 2%{?dist}
+License: LGPL
+Group: System Environment/Libraries
+URL: http://redhat.com/mrg
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+BuildArch: noarch
+
+Requires: cumin
+
+
+%description
+This is the top-level package that includes the MRG management component.
+
+%install
+rm -rf %{buildroot}
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%prep
+
+%build
+
+%files
+%defattr(-,root,root,-)
+
+
+%changelog
+* Mon Feb 11 2008 Rafael Schloming <rafaels(a)redhat.com> - 1.0-2
+- Bump for Beta 3 update
+
+* Fri Jan 18 2008 Nuno Santos <nsantos(a)redhat.com> - 1.0-1
+- Initial build.
+
Copied: mgmt/trunk/notes (from rev 2166, mgmt/notes)
16 years, 6 months
rhmessaging commits: r2166 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-26 10:34:56 -0400 (Thu, 26 Jun 2008)
New Revision: 2166
Modified:
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/update.py
Log:
Complete the transition to queued updates
- Pass the model to the process callbacks on update events
- Rename the callbacks and some arguments to be in line with the new
schema names
- Improve update logging
- Log full stack traces on exception
- Implement the process method of the method update event
- Add update handling for control and close events
- Set the broker uuid and session id on the connection when we
receive a "broker info" control
- We were dropping object property updates (as opposed to initial
property sets); fix that
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/__init__.py 2008-06-26 14:34:56 UTC (rev 2166)
@@ -131,11 +131,15 @@
self.id = "%s:%i" % (host, port)
self.objectsById = dict()
+ # Set upon receiving a broker info control
+ self.sessionId = None
+ self.brokerId = None
+
# state in (None, "opening", "opened", "closing", "closed")
self.state = None
self.exception = None
- self.conn = None
+ self.mconn = None
self.mclient = None
self.mchan = None
@@ -155,7 +159,7 @@
return self.state == "opened"
def open(self):
- assert self.conn is None
+ assert self.mconn is None
assert self.mclient is None
assert self.mchan is None
@@ -169,19 +173,19 @@
self.exception = e
raise e
- self.conn = QpidConnection(sock, spec)
+ self.mconn = QpidConnection(sock, spec)
self.mclient = managementClient(spec,
self.model.controlCallback,
- self.model.configCallback,
- self.model.instCallback,
+ self.model.propsCallback,
+ self.model.statsCallback,
self.model.methodCallback,
self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
try:
- self.conn.start()
+ self.mconn.start()
self.mchan = self.mclient.addChannel \
- (self.conn.session(str(uuid4())), self)
+ (self.mconn.session(str(uuid4())), self)
self.state = "opened"
except Exception, e:
@@ -232,10 +236,10 @@
self.exception = e
raise e
- self.conn.close()
+ self.mconn.close()
# XXX What else do I need to try to shutdown here?
- self.conn = None
+ self.mconn = None
self.mclient = None
self.mchan = None
@@ -287,75 +291,32 @@
def stop(self):
self.updateThread.stop()
- def setDebug(self, debug=True):
- self.debug = debug
-
- def log(self, message):
- if (self.debug):
- print message
-
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, conn, classInfo, configs, metric, methods, events):
- up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+ def schemaCallback(self, conn, classInfo, props, stats, methods, events):
+ up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
self.updateThread.enqueue(up)
- def configCallback(self, conn, classInfo, props, timestamps):
+ def propsCallback(self, conn, classInfo, props, timestamps):
up = update.PropertyUpdate(conn, classInfo, props, timestamps)
self.updateThread.enqueue(up)
- def instCallback(self, conn, classInfo, stats, timestamps):
+ def statsCallback(self, conn, classInfo, stats, timestamps):
up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
self.updateThread.enqueue(up)
- def methodCallback(self, conn, methodId, errorNo, errorText, args):
- self.log("\nMETHOD---------------------------------------------------")
- self.log("MethodId=%d" % (methodId))
- self.log("Error: %d %s" % (errorNo, errorText))
- self.log("Args: ")
- self.log(args)
-
- self.lock()
- try:
- method = self.outstandingMethodCalls.pop(methodId)
- result = method(errorText, args)
- finally:
- self.unlock()
-
- self.log("END METHOD---------------------------------------------------\n")
- return result
+ def methodCallback(self, conn, methodId, errorId, errorText, args):
+ up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
+ self.updateThread.enqueue(up)
def closeCallback(self, conn, data):
- self.log("\nCLOSE---------------------------------------------------")
- self.log("BrokerId=%s , Data=%s" % (conn.id, data))
-
- self.lock()
- try:
- del self.connections[conn.id]
-
- if (self.connCloseListener != None):
- self.connCloseListener(conn, data)
- finally:
- self.unlock()
-
- self.log("END CLOSE---------------------------------------------------\n")
- return
+ up = update.CloseCallback(conn, data)
+ self.updateThread.enqueue(up)
def controlCallback(self, conn, type, data):
- self.log("\nCONTROL---------------------------------------------------")
- readableType = "UNKNOWN"
- if (type == managementClient.CTRL_BROKER_INFO):
- readableType = "CTRL_BROKER_INFO"
- elif (type == managementClient.CTRL_SCHEMA_LOADED):
- readableType = "CTRL_SCHEMA_LOADED"
- elif (type == managementClient.CTRL_USER):
- readableType = "CTRL_USER"
- elif (type == managementClient.CTRL_HEARTBEAT):
- readableType = "CTRL_HEARTBEAT"
- self.log("BrokerId=%s , Type=%s, Data=%s" % (conn.id, readableType, data))
- self.log("END CONTROL---------------------------------------------------\n")
- return
+ up = update.ControlUpdate(conn, type, data)
+ self.updateThread.enqueue(up)
def registerCallback(self, callback):
self.lock()
Modified: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py 2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/update.py 2008-06-26 14:34:56 UTC (rev 2166)
@@ -3,6 +3,8 @@
from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
from datetime import datetime
+from qpid.management import managementClient
+from struct import unpack
import mint
@@ -21,7 +23,7 @@
try:
self.updates.put(update)
except Full:
- log.warn("Update queue is full")
+ log.exception("Queue is full")
def run(self):
while True:
@@ -31,13 +33,13 @@
if self.stopRequested:
break
else:
- log.info("Update queue is empty")
+ log.debug("Queue is empty")
continue
try:
- update.process()
- except Exception, e:
- log.error(e)
+ update.process(self.model)
+ except:
+ log.exception("Update failed")
def stop(self):
self.stopRequested = True
@@ -110,18 +112,18 @@
self.methods = methods
self.events = events
- def process(self):
- args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
- log.info("%-8s %-16s %-8s %-12s" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id, cls))
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
+ cls.classInfos[self.conn.id] = self.classInfo
except UnknownClassException, e:
log.warn(e)
return
- if cls:
- cls.classInfos[self.conn.id] = self.classInfo
+ # XXX do more schema checking
class PropertyUpdate(object):
def __init__(self, conn, classInfo, props, timestamps):
@@ -130,10 +132,10 @@
self.props = props
self.timestamps = timestamps
- def process(self):
- args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
- len(self.props))
- log.info("%-8s %-16s %-8s %-12s %i" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ args = ("props", self.conn.id, cls, len(self.props))
+ log.info("Processing %-8s %-16s %-16s %3i" % args)
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -147,6 +149,8 @@
processAttributes(self.conn, attrs, cls)
+ # XXX move these down to the try/except
+
attrs["managedBroker"] = self.conn.id
attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
attrs["creationTime"] = datetime.fromtimestamp \
@@ -156,9 +160,17 @@
obj = self.conn.getObject(cls, id)
except mint.ObjectNotFound:
obj = cls()
- obj.set(**attrs)
- obj.syncUpdate()
+ #obj.sourceId = id
+ #obj.sourceBrokerId = self.conn.brokerId
+
+ #hash = "%08x%08x%08x%08x" % unpack("!LLLL", self.classInfo[2])
+ #classInfo = (self.classInfo[0], self.classInfo[1], hash)
+ #obj.sourceClassInfo = ",".join(classInfo)
+
+ obj.set(**attrs)
+ obj.syncUpdate()
+
# XXX refactor this to take advantage of the get/create logic
# above
if isinstance(obj, mint.Broker) and obj.managedBroker:
@@ -185,10 +197,10 @@
self.stats = stats
self.timestamps = timestamps
- def process(self):
- args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
- len(self.stats))
- log.info("%-8s %-16s %-8s %-12s %i" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ args = ("stats", self.conn.id, cls, len(self.stats))
+ log.info("Processing %-8s %-16s %-16s %3i" % args)
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -199,13 +211,8 @@
attrs = dict(self.stats)
id = attrs["id"]
+ obj = self.conn.getObject(cls, id)
- try:
- obj = self.conn.getObject(cls, id)
- except ObjectNotFound, e:
- log.warn(e)
- return
-
statscls = getStatsClass(cls)
processAttributes(self.conn, attrs, statscls)
@@ -226,21 +233,67 @@
obj.syncUpdate()
class MethodUpdate(object):
- def __init__(self, conn, methodId, errorId, errorText, args):
+ def __init__(self, conn, methodId, errorCode, errorText, args):
self.conn = conn
self.methodId = methodId
- self.errorId = errorId
+ self.errorCode = errorCode
self.errorText = errorText
self.args = args
- def process(self):
- args = ("stats", self.conn.id, self.methodId, self.errorId,
+ def process(self, model):
+ args = ("method", self.conn.id, self.methodId, self.errorCode,
self.errorText)
- log.info("%-8s %-16s %-8s %-8s %s" % args)
+ log.info("Processing %-8s %-16s %-12s %-12s %s" % args)
+ model.lock()
+ try:
+ method = model.outstandingMethodCalls.pop(self.methodId)
+ method(self.errorText, args)
+ finally:
+ model.unlock()
+
class CloseUpdate(object):
- def __init__(self, brokerId, data):
- self.brokerId = brokerId
+ def __init__(self, conn, data):
+ self.conn = conn
+ self.data = data
- def process(self):
- log.info("%-8s %-16s" % ("close", self.brokerId))
+ def process(self, model):
+ log.info("Processing %-8s %-16s" % ("close", self.conn.id))
+
+ model.lock()
+ try:
+ del model.connections[conn.id]
+
+ if model.connCloseListener:
+ model.connCloseListener(conn, data)
+ finally:
+ model.unlock()
+
+class ControlUpdate(object):
+ __types = {
+ managementClient.CTRL_BROKER_INFO: "broker_info",
+ managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
+ managementClient.CTRL_USER: "user",
+ managementClient.CTRL_HEARTBEAT: "heartbeat"
+ }
+
+ def __init__(self, conn, typeCode, data):
+ self.conn = conn
+ self.typeCode = typeCode
+ self.data = data
+
+ def process(self, model):
+ type = self.__types.get(self.typeCode, "[unknown]")
+ args = ("control", self.conn.id, type, self.data)
+ log.info("Processing %-8s %-16s %-16s %s" % args)
+
+ if self.typeCode == managementClient.CTRL_BROKER_INFO:
+ uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
+ ("!LHHHHL", self.data.brokerId)
+
+ log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
+ log.info("Session ID from %s is '%s'" % \
+ (self.conn.id, self.data.sessionId))
+
+ self.conn.brokerId = uuid
+ self.conn.sessionId = self.data.sessionId
16 years, 6 months
rhmessaging commits: r2165 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-25 17:18:15 -0400 (Wed, 25 Jun 2008)
New Revision: 2165
Modified:
mgmt/mint/python/mint/__init__.py
Log:
Use the callback context argument to avoid extra connection lookups; clean up connection handling in BrokerConnection open
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-25 19:39:49 UTC (rev 2164)
+++ mgmt/mint/python/mint/__init__.py 2008-06-25 21:18:15 UTC (rev 2165)
@@ -178,22 +178,19 @@
self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
- self.model.lock()
try:
- try:
- # XXX I want this to happen after broker start, but the
- # callbacks rely on the broker being in the connectedBrokers
- # dict
- self.model.connections[self.id] = self
+ self.conn.start()
+ self.mchan = self.mclient.addChannel \
+ (self.conn.session(str(uuid4())), self)
- self.conn.start()
- self.mchan = self.mclient.addChannel(self.conn.session(str(uuid4())),
- self.id)
+ self.state = "opened"
+ except Exception, e:
+ self.exception = e
+ raise e
- self.state = "opened"
- except Exception, e:
- self.exception = e
- raise e
+ self.model.lock()
+ try:
+ self.model.connections[self.id] = self
finally:
self.model.unlock()
@@ -223,18 +220,18 @@
self.model.lock()
try:
- try:
- self.mclient.removeChannel(self.mchan)
-
del self.model.connections[self.id]
-
- self.state = "closed"
- except Exception, e:
- self.exception = e
- raise e
finally:
self.model.unlock()
+ try:
+ self.mclient.removeChannel(self.mchan)
+
+ self.state = "closed"
+ except Exception, e:
+ self.exception = e
+ raise e
+
self.conn.close()
# XXX What else do I need to try to shutdown here?
@@ -300,32 +297,19 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def getConnection(self, id):
- self.lock()
- try:
- try:
- return self.connections[id]
- except KeyError:
- log.error("Connection '%s' not found" % id)
- finally:
- self.unlock()
-
- def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
- conn = self.getConnection(brokerId)
+ def schemaCallback(self, conn, classInfo, configs, metric, methods, events):
up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
self.updateThread.enqueue(up)
- def configCallback(self, brokerId, classInfo, props, timestamps):
- conn = self.getConnection(brokerId)
+ def configCallback(self, conn, classInfo, props, timestamps):
up = update.PropertyUpdate(conn, classInfo, props, timestamps)
self.updateThread.enqueue(up)
- def instCallback(self, brokerId, classInfo, stats, timestamps):
- conn = self.getConnection(brokerId)
+ def instCallback(self, conn, classInfo, stats, timestamps):
up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
self.updateThread.enqueue(up)
- def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
+ def methodCallback(self, conn, methodId, errorNo, errorText, args):
self.log("\nMETHOD---------------------------------------------------")
self.log("MethodId=%d" % (methodId))
self.log("Error: %d %s" % (errorNo, errorText))
@@ -342,23 +326,23 @@
self.log("END METHOD---------------------------------------------------\n")
return result
- def closeCallback(self, brokerId, data):
+ def closeCallback(self, conn, data):
self.log("\nCLOSE---------------------------------------------------")
- self.log("BrokerId=%s , Data=%s" % (brokerId, data))
+ self.log("BrokerId=%s , Data=%s" % (conn.id, data))
self.lock()
try:
- del self.connections[brokerId]
+ del self.connections[conn.id]
if (self.connCloseListener != None):
- self.connCloseListener(brokerId, data)
+ self.connCloseListener(conn, data)
finally:
self.unlock()
self.log("END CLOSE---------------------------------------------------\n")
return
- def controlCallback(self, brokerId, type, data):
+ def controlCallback(self, conn, type, data):
self.log("\nCONTROL---------------------------------------------------")
readableType = "UNKNOWN"
if (type == managementClient.CTRL_BROKER_INFO):
@@ -369,7 +353,7 @@
readableType = "CTRL_USER"
elif (type == managementClient.CTRL_HEARTBEAT):
readableType = "CTRL_HEARTBEAT"
- self.log("BrokerId=%s , Type=%s, Data=%s" % (brokerId, readableType, data))
+ self.log("BrokerId=%s , Type=%s, Data=%s" % (conn.id, readableType, data))
self.log("END CONTROL---------------------------------------------------\n")
return
16 years, 6 months
rhmessaging commits: r2164 - mgmt/basil/python/basil.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-25 15:39:49 -0400 (Wed, 25 Jun 2008)
New Revision: 2164
Modified:
mgmt/basil/python/basil/__init__.py
Log:
Separate the logic for getting object ids; pass the broker id, not the conn, to get_object
Modified: mgmt/basil/python/basil/__init__.py
===================================================================
--- mgmt/basil/python/basil/__init__.py 2008-06-25 19:29:33 UTC (rev 2163)
+++ mgmt/basil/python/basil/__init__.py 2008-06-25 19:39:49 UTC (rev 2164)
@@ -133,31 +133,33 @@
pass
def on_props(self, conn, class_info, values, timestamps):
- object = self.get_object(class_info, conn, values)
+ id = self.get_object_id(values)
+ object = self.get_object(class_info, conn.broker_id, id)
for name, value in values:
setattr(object, name, value)
def on_stats(self, conn, class_info, values, timestamps):
- object = self.get_object(class_info, conn, values)
+ id = self.get_object_id(values)
+ object = self.get_object(class_info, conn.broker_id, id)
for name, value in values:
setattr(object, name, value)
- def get_object(self, class_info, conn, values):
+ def get_object_id(self, values):
for name, value in values:
- if name == "id":
- id = value
- break
+ if name == "id" and value is not None:
+ return value
- assert id is not None
+ raise Exception("ID not found")
+ def get_object(self, class_info, broker_id, id):
try:
- object = self.objects_by_composite_id[(conn.broker_id, id)]
+ object = self.objects_by_composite_id[(broker_id, id)]
except KeyError:
- object = self.python_class(class_info, conn.broker_id, id)
+ object = self.python_class(class_info, broker_id, id)
self.objects.append(object)
- self.objects_by_composite_id[(conn.broker_id, id)] = object
+ self.objects_by_composite_id[(broker_id, id)] = object
return object
16 years, 6 months
rhmessaging commits: r2163 - mgmt/basil/python/basil.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-25 15:29:33 -0400 (Wed, 25 Jun 2008)
New Revision: 2163
Modified:
mgmt/basil/python/basil/__init__.py
Log:
Use the context arg to callbacks to pass the originating connection
Modified: mgmt/basil/python/basil/__init__.py
===================================================================
--- mgmt/basil/python/basil/__init__.py 2008-06-25 18:26:12 UTC (rev 2162)
+++ mgmt/basil/python/basil/__init__.py 2008-06-25 19:29:33 UTC (rev 2163)
@@ -20,33 +20,32 @@
self.lock = Lock()
- def on_schema(self, broker_id, class_info,
- configs, metrics, methods, events):
+ def on_schema(self, conn, class_info, configs, metrics, methods, events):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_schema(broker_id, class_info,
+ package.on_schema(conn, class_info,
configs, metrics, methods, events)
finally:
self.lock.release()
- def on_props(self, broker_id, class_info, values, timestamps):
+ def on_props(self, conn, class_info, values, timestamps):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_props(broker_id, class_info, values, timestamps)
+ package.on_props(conn, class_info, values, timestamps)
finally:
self.lock.release()
- def on_stats(self, broker_id, class_info, values, timestamps):
+ def on_stats(self, conn, class_info, values, timestamps):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_stats(broker_id, class_info, values, timestamps)
+ package.on_stats(conn, class_info, values, timestamps)
finally:
self.lock.release()
- def on_callback(self, broker_id, seq, status_code, status_text, args):
+ def on_callback(self, conn, seq, status_code, status_text, args):
self.lock.acquire()
try:
object, name, callback, kwargs = self.method_calls.pop(seq)
@@ -77,19 +76,17 @@
self.classes = list()
- def on_schema(self, broker_id, class_info,
- configs, metrics, methods, events):
+ def on_schema(self, conn, class_info, configs, metrics, methods, events):
cls = self.get_class(class_info)
- cls.on_schema(broker_id, class_info,
- configs, metrics, methods, events)
+ cls.on_schema(conn, class_info, configs, metrics, methods, events)
- def on_props(self, broker_id, class_info, values, timestamps):
+ def on_props(self, conn, class_info, values, timestamps):
cls = self.get_class(class_info)
- cls.on_props(broker_id, class_info, values, timestamps)
+ cls.on_props(conn, class_info, values, timestamps)
- def on_stats(self, broker_id, class_info, values, timestamps):
+ def on_stats(self, conn, class_info, values, timestamps):
cls = self.get_class(class_info)
- cls.on_stats(broker_id, class_info, values, timestamps)
+ cls.on_stats(conn, class_info, values, timestamps)
def get_class(self, class_info):
name = class_info[1]
@@ -120,8 +117,7 @@
attrs["basil_class"] = self
self.python_class = type(str(name), (BasilObject,), attrs)
- def on_schema(self, broker_id, class_info,
- configs, metrics, methods, events):
+ def on_schema(self, conn, class_info, configs, metrics, methods, events):
for spec in configs:
setattr(self.python_class, spec[0], None)
@@ -136,19 +132,19 @@
for spec in events:
pass
- def on_props(self, broker_id, class_info, values, timestamps):
- object = self.get_object(class_info, broker_id, values)
+ def on_props(self, conn, class_info, values, timestamps):
+ object = self.get_object(class_info, conn, values)
for name, value in values:
setattr(object, name, value)
- def on_stats(self, broker_id, class_info, values, timestamps):
- object = self.get_object(class_info, broker_id, values)
+ def on_stats(self, conn, class_info, values, timestamps):
+ object = self.get_object(class_info, conn, values)
for name, value in values:
setattr(object, name, value)
- def get_object(self, class_info, broker_id, values):
+ def get_object(self, class_info, conn, values):
for name, value in values:
if name == "id":
id = value
@@ -157,11 +153,11 @@
assert id is not None
try:
- object = self.objects_by_composite_id[(broker_id, id)]
+ object = self.objects_by_composite_id[(conn.broker_id, id)]
except KeyError:
- object = self.python_class(class_info, broker_id, id)
+ object = self.python_class(class_info, conn.broker_id, id)
self.objects.append(object)
- self.objects_by_composite_id[(broker_id, id)] = object
+ self.objects_by_composite_id[(conn.broker_id, id)] = object
return object
@@ -217,7 +213,7 @@
def open(self):
self.conn.start()
self.chan = self.mclient.addChannel \
- (self.conn.session(str(uuid4())), self.broker_id)
+ (self.conn.session(str(uuid4())), self)
def close(self):
self.mclient.removeChannel(self.chan)
16 years, 6 months