Author: gordonsim
Date: 2012-08-10 08:39:41 -0400 (Fri, 10 Aug 2012)
New Revision: 4511
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/MessageUtils.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/python_tests/__init__.py
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Changes required by broker refactor (QPID-4178)
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -1342,17 +1342,9 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
- if (message->isContentReleased()) {
- jc->enqueue_extern_data_record(size, dtokp.get(),
!message->isPersistent());
- } else {
- jc->enqueue_data_record(&buff[0], size, size, dtokp.get(),
!message->isPersistent());
- }
+ jc->enqueue_data_record(&buff[0], size, size, dtokp.get(),
!message->isPersistent());
} else {
- if (message->isContentReleased()) {
- jc->enqueue_extern_txn_data_record(size, dtokp.get(),
txn->getXid(), !message->isPersistent());
- } else {
- jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(),
txn->getXid(), !message->isPersistent());
- }
+ jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(),
txn->getXid(), !message->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed:
queue NULL."));
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -115,7 +115,7 @@
{
if (wsize > rem)
wsize = rem;
- std::memcpy(wptr, (char*)_xidp + rec_offs, wsize);
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
rem -= wsize;
}
@@ -143,7 +143,7 @@
std::size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize -
rec_offs : 0;
if (wsize)
{
- std::memcpy(wptr, (char*)_xidp + rec_offs, wsize);
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
}
rec_offs -= _deq_hdr._xidsize - wsize;
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -142,7 +142,7 @@
{
if (wsize > rem)
wsize = rem;
- std::memcpy((char*)wptr + wr_cnt, (char*)_data + rec_offs, wsize);
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs,
wsize);
wr_cnt += wsize;
rem -= wsize;
}
@@ -171,14 +171,14 @@
std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize -
rec_offs : 0;
if (wsize)
{
- std::memcpy(wptr, (char*)_xidp + rec_offs, wsize);
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
}
rec_offs -= _enq_hdr._xidsize - wsize;
wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
if (wsize && !_enq_hdr.is_external())
{
- std::memcpy((char*)wptr + wr_cnt, (char*)_data + rec_offs, wsize);
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
wr_cnt += wsize;
}
rec_offs -= _enq_hdr._dsize - wsize;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -955,7 +955,7 @@
ofsp.seekp(file_pos);
void* buff = std::malloc(JRNL_DBLK_SIZE);
assert(buff != 0);
- std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+ std::memcpy(buff, (const void*)&xmagic, sizeof(xmagic));
// Normally, RHM_CLEAN must be set before these fills are done, but this is a
recover
// situation (i.e. performance is not an issue), and it makes the location of the
write
// clear should inspection of the file be required.
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -115,7 +115,7 @@
{
if (wsize > rem)
wsize = rem;
- std::memcpy(wptr, (char*)_xidp + rec_offs, wsize);
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
rem -= wsize;
}
@@ -143,7 +143,7 @@
std::size_t wsize = _txn_hdr._xidsize > rec_offs ? _txn_hdr._xidsize -
rec_offs : 0;
if (wsize)
{
- std::memcpy(wptr, (char*)_xidp + rec_offs, wsize);
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
}
rec_offs -= _txn_hdr._xidsize - wsize;
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -193,7 +193,7 @@
if (xid_len) // If part of transaction, add to transaction map
{
- std::string xid((char*)xid_ptr, xid_len);
+ std::string xid((const char*)xid_ptr, xid_len);
_tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
}
else
@@ -293,7 +293,7 @@
{
// If the enqueue is part of a pending txn, it will not yet be in emap
_emap.lock(dequeue_rid); // ignore rid not found error
- std::string xid((char*)xid_ptr, xid_len);
+ std::string xid((const char*)xid_ptr, xid_len);
_tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(),
false));
}
else
@@ -390,7 +390,7 @@
dtokp->set_wstate(data_tok::ABORT_SUBM);
// Delete this txn from tmap, unlock any locked records in emap
- std::string xid((char*)xid_ptr, xid_len);
+ std::string xid((const char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if
xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
@@ -480,7 +480,7 @@
dtokp->set_wstate(data_tok::COMMIT_SUBM);
// Delete this txn from tmap, process records into emap
- std::string xid((char*)xid_ptr, xid_len);
+ std::string xid((const char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if
xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
@@ -961,7 +961,7 @@
while (_cached_offset_dblks < wdblks)
{
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
- std::memcpy(wptr, (void*)&xmagic, sizeof(xmagic));
+ std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
#ifdef RHM_CLEAN
std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE -
sizeof(xmagic));
#endif
Modified: store/trunk/cpp/tests/MessageUtils.h
===================================================================
--- store/trunk/cpp/tests/MessageUtils.h 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/MessageUtils.h 2012-08-10 12:39:41 UTC (rev 4511)
@@ -22,6 +22,8 @@
*/
#include <qpid/broker/Message.h>
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/amqp_0_10/MessageTransfer.h>
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/all_method_bodies.h>
#include <qpid/framing/Uuid.h>
@@ -31,15 +33,14 @@
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const std::string&
exchange, const std::string& routingKey,
- const Uuid& messageId=Uuid(),
const bool persistent = false,
- const uint64_t contentSize = 0,
const std::string& correlationId = std::string())
+ static Message createMessage(const std::string& exchange, const std::string&
routingKey,
+ const Uuid& messageId=Uuid(), const bool durable =
false,
+ const uint64_t contentSize = 0, const std::string&
correlationId = std::string())
{
- boost::intrusive_ptr<Message> msg(new Message());
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(new
qpid::broker::amqp_0_10::MessageTransfer());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
- header.setLastSegment(contentSize == 0);
msg->getFrames().append(method);
msg->getFrames().append(header);
@@ -48,21 +49,59 @@
props->setMessageId(messageId);
props->setCorrelationId(correlationId);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- if (persistent)
+ if (durable)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(PERSISTENT);
- return msg;
+ return Message(msg, msg);
}
- static void addContent(boost::intrusive_ptr<Message> msg, const
std::string& data)
+ static void addContent(Message msg, const std::string& data)
{
AMQFrame content((AMQContentBody(data)));
- msg->getFrames().append(content);
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).getFrames().append(content);
}
- static void deliver(QueuedMessage& msg, FrameHandler& h, uint16_t framesize)
+ struct MessageRetriever : public Consumer
{
- msg.payload->sendHeader(h, framesize);
- msg.payload->sendContent(*(msg.queue), h, framesize);
+ MessageRetriever(Queue& q) : Consumer("test", CONSUMER), queue(q)
{};
+
+ bool deliver(const QueueCursor& c, const Message& m)
+ {
+ message = m;
+ cursor = c;
+ return true;
+ };
+ void notify() {}
+ void cancel() {}
+ void acknowledged(const DeliveryRecord&) {}
+ OwnershipToken* getSession() { return 0; }
+
+ const Queue& queue;
+ Message message;
+ QueueCursor cursor;
+ };
+
+ static Message get(Queue& queue, QueueCursor* cursor = 0)
+ {
+ boost::shared_ptr<MessageRetriever> consumer(new MessageRetriever(queue));
+ if (!queue.dispatch(consumer))throw qpid::Exception("No message
found!");
+ if (cursor) *cursor = consumer->cursor;
+ return consumer->message;
}
+ static Uuid getMessageId(const Message& message)
+ {
+ return
qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getMessageId();
+ }
+
+ static std::string getCorrelationId(const Message& message)
+ {
+ return
qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getCorrelationId();
+ }
+
+ static void deliver(Message& msg, FrameHandler& h, uint16_t framesize)
+ {
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize,
false, 0, 0, qpid::types::Variant::Map());
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize);
+ }
+
};
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -58,16 +58,37 @@
QueueRegistry queues;
Queue::shared_ptr queue;
std::queue<Uuid> ids;
-int counter = 1;
+class TestConsumer : public Consumer
+{
+ public:
+ TestConsumer(Queue::shared_ptr q, std::queue<Uuid>& i) :
Consumer("test", CONSUMER), queue(q), ids(i) {};
+
+ bool deliver(const QueueCursor& cursor, const Message& message)
+ {
+ queue->dequeue(0, cursor);
+ BOOST_CHECK_EQUAL(ids.front(), MessageUtils::getMessageId(message));
+ ids.pop();
+ return true;
+ };
+ void notify() {}
+ void cancel() {}
+ void acknowledged(const DeliveryRecord&) {}
+ OwnershipToken* getSession() { return 0; }
+ private:
+ Queue::shared_ptr queue;
+ std::queue<Uuid>& ids;
+};
+boost::shared_ptr<TestConsumer> consumer;
+
void setup()
{
store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
store->init(test_dir, 4, 1, true); // truncate store
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
- FieldTable settings;
- queue->create(settings);
+ queue->create();
+ consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
}
void push()
@@ -75,24 +96,14 @@
Uuid messageId(true);
ids.push(messageId);
- boost::intrusive_ptr<Message> msg =
MessageUtils::createMessage("exchange", "routing_key", messageId,
true, 0);
+ Message msg = MessageUtils::createMessage("exchange",
"routing_key", messageId, true, 0);
queue->deliver(msg);
}
bool pop()
{
- boost::intrusive_ptr<Message> msg = queue->get().payload;
- if (msg) {
- QueuedMessage qm;
- qm.payload = msg;
- queue->dequeue(0, qm);
- BOOST_CHECK_EQUAL(ids.front(),
msg->getProperties<MessageProperties>()->getMessageId());
- ids.pop();
- return true;
- } else {
- return false;
- }
+ return queue->dispatch(consumer);
}
void restart()
@@ -111,6 +122,7 @@
store->recover(recoveryMgr);
queue = queues.find(name);
+ consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
}
void check()
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -28,6 +28,7 @@
#include "MessageUtils.h"
#include "StoreException.h"
#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueSettings.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/FieldTable.h>
@@ -210,15 +211,17 @@
{
cout << test_filename << ".QueueCreateWithSettings: " <<
flush;
- std::auto_ptr<QueuePolicy> policy( QueuePolicy::createQueuePolicy(101, 202));
+ FieldTable arguments;
+ arguments.setInt("qpid.max_count", 202);
+ arguments.setInt("qpid.max_size", 1003);
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
string name("MyDurableQueue");
{
MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
- Queue queue(name, 0, &store, 0);
- FieldTable settings;
- policy->update(settings);
- queue.create(settings);
+ Queue queue(name, settings, &store, 0);
+ queue.create();
BOOST_REQUIRE(queue.getPersistenceId());
}//db will be closed
{
@@ -229,9 +232,10 @@
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
- BOOST_REQUIRE(queue->getPolicy());
- BOOST_CHECK_EQUAL(policy->getMaxCount(),
queue->getPolicy()->getMaxCount());
- BOOST_CHECK_EQUAL(policy->getMaxSize(),
queue->getPolicy()->getMaxSize());
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+ BOOST_CHECK_EQUAL(settings.maxDepth.getCount(),
queue->getSettings().maxDepth.getCount());
+ BOOST_CHECK_EQUAL(settings.maxDepth.getSize(),
queue->getSettings().maxDepth.getSize());
}
cout << "ok" << endl;
@@ -279,16 +283,15 @@
MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
- FieldTable settings;
- queue->create(settings);
+ queue->create();
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange,
routingKey, messageId, true, 14);
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true,
14);
MessageUtils::addContent(msg, data1);
MessageUtils::addContent(msg, data2);
- msg->insertCustomProperty("abc", "xyz");
+ msg.addAnnotation("abc", "xyz");
- queue->enqueue(0, msg);
+ queue->deliver(msg);
}//db will be closed
{
MessageStoreImpl store(timer);
@@ -299,18 +302,15 @@
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
- boost::intrusive_ptr<Message> msg = queue->get().payload;
+ Message msg = MessageUtils::get(*queue);
- BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
- BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
- BOOST_CHECK_EQUAL(messageId,
msg->getProperties<MessageProperties>()->getMessageId());
- BOOST_CHECK_EQUAL((uint8_t) PERSISTENT,
msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_REQUIRE(Str16Value("xyz") ==
*msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
- BOOST_CHECK_EQUAL((u_int64_t) 14, msg->contentSize());
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
+ BOOST_CHECK_EQUAL(std::string("xyz"),
msg.getAnnotation("abc"));
+ BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
DummyHandler handler;
- QueuedMessage qm(queue.get(),msg,0);
- MessageUtils::deliver(qm, handler, 100);
+ MessageUtils::deliver(msg, handler, 100);
BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
BOOST_REQUIRE(contentBody);
@@ -335,16 +335,16 @@
MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
- FieldTable settings;
- queue->create(settings);
+ queue->create();
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange,
routingKey, messageId, true, 7);
+ Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true,
7);
MessageUtils::addContent(msg, data);
- QueuedMessage qm;
- qm.payload = msg;
- queue->enqueue(0, msg);
- queue->dequeue(0, qm);
+ queue->deliver(msg);
+
+ QueueCursor cursor;
+ MessageUtils::get(*queue, &cursor);
+ queue->dequeue(0, cursor);
}//db will be closed
{
MessageStoreImpl store(timer);
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -116,11 +116,10 @@
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
- FieldTable settings;
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
- queueA->create(settings);
+ queueA->create();
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
- queueB->create(settings);
+ queueB->create();
}
template <class T>
@@ -146,10 +145,9 @@
queueB = queues->find(nameB);
}
-boost::intrusive_ptr<Message> createMessage(const string& id, const string&
exchange="exchange", const string& key="routing_key")
+Message createMessage(const string& id, const string&
exchange="exchange", const string& key="routing_key")
{
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key,
Uuid(), true, 0, id);
- return msg;
+ return MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
}
void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid =
"<none>")
@@ -157,9 +155,9 @@
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL(size, queue->getMessageCount());
if (size > 0) {
- boost::intrusive_ptr<Message> msg = queue->get().payload;
+ Message msg = MessageUtils::get(*queue);
BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid,
msg->getProperties<MessageProperties>()->getCorrelationId());
+ BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
}
}
@@ -168,17 +166,19 @@
setup<MessageStoreImpl>();
//create message and enqueue it onto first queue:
- boost::intrusive_ptr<Message> msgA = createMessage("Message",
"exchange", "routing_key");
+ Message msgA = createMessage("Message", "exchange",
"routing_key");
queueA->deliver(msgA);
- boost::intrusive_ptr<Message> msgB = queueA->get().payload;
+ QueueCursor cursorB;
+ Message msgB = MessageUtils::get(*queueA, &cursorB);
BOOST_REQUIRE(msgB);
//move the message from one queue to the other as a transaction
std::auto_ptr<TransactionContext> txn = store->begin();
- queueB->enqueue(txn.get(), msgB);//note: need to enqueue it first to avoid message
being deleted
- QueuedMessage qmB;
- qmB.payload = msgB;
- queueA->dequeue(txn.get(), qmB);
+ TxBuffer tx;
+ queueB->deliver(msgB, &tx);//note: need to enqueue it first to avoid message
being deleted
+
+ queueA->dequeue(txn.get(), cursorB);
+ tx.prepare(txn.get());
if (commit) {
store->commit(*txn);
} else {
@@ -212,15 +212,17 @@
setup<TestMessageStore>();
TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
std::auto_ptr<TransactionContext> txn(tmsp->begin());
+ TxBuffer tx;
//create two messages and enqueue them onto both queues:
- boost::intrusive_ptr<Message> msgA = createMessage("MessageA",
"exchange", "routing_key");
- queueA->enqueue(txn.get(), msgA);
- queueB->enqueue(txn.get(), msgA);
- boost::intrusive_ptr<Message> msgB = createMessage("MessageB",
"exchange", "routing_key");
- queueA->enqueue(txn.get(), msgB);
- queueB->enqueue(txn.get(), msgB);
+ Message msgA = createMessage("MessageA", "exchange",
"routing_key");
+ queueA->deliver(msgA, &tx);
+ queueB->deliver(msgA, &tx);
+ Message msgB = createMessage("MessageB", "exchange",
"routing_key");
+ queueA->deliver(msgB, &tx);
+ queueB->deliver(msgB, &tx);
+ tx.prepare(txn.get());
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
tmsp->commit(*txn, complete_prepared_list);
@@ -240,42 +242,6 @@
checkMsg(queueB, 0);
}
-boost::intrusive_ptr<Message> nonTxEnq(Queue::shared_ptr q)
-{
- boost::intrusive_ptr<Message> msg = createMessage("Message",
"exchange", "routingKey");
- q->deliver(msg);
- return msg;
-}
-
-QueuedMessage getMsg(Queue::shared_ptr q)
-{
- boost::intrusive_ptr<Message> msg = q->get().payload;
- BOOST_REQUIRE(msg);
- QueuedMessage qm;
- qm.payload = msg;
- return qm;
-}
-
-void txDeq(Queue::shared_ptr q, QueuedMessage& qm, TransactionContext* tp)
-{
- q->dequeue(tp, qm);
-}
-
-void testLock(Queue::shared_ptr q, QueuedMessage qm)
-{
- try {
- q->dequeue(0, qm);
- BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
- }
- catch (const mrg::msgstore::StoreException& e) {
- if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
- BOOST_ERROR("Unexpected StoreException: " << e.what());
- }
- catch (const std::exception& e) {
- BOOST_ERROR("Unexpected exception: " << e.what());
- }
-}
-
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
@@ -355,11 +321,24 @@
cout << test_filename << ".LockedRecordTest: " << flush;
setup<MessageStoreImpl>();
- nonTxEnq(queueA);
+ queueA->deliver(createMessage("Message", "exchange",
"routingKey"));
std::auto_ptr<TransactionContext> txn = store->begin();
- QueuedMessage qm = getMsg(queueA);
- txDeq(queueA, qm, txn.get());
- testLock(queueA, qm);
+
+ QueueCursor cursor;
+ Message msg = MessageUtils::get(*queueA, &cursor);
+ queueA->dequeue(txn.get(), cursor);
+
+ try {
+ store->dequeue(0, msg.getPersistentContext(), *queueA);
+ BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
+ }
+ catch (const mrg::msgstore::StoreException& e) {
+ if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
+ BOOST_ERROR("Unexpected StoreException: " << e.what());
+ }
+ catch (const std::exception& e) {
+ BOOST_ERROR("Unexpected exception: " << e.what());
+ }
store->commit(*txn);
checkMsg(queueA, 0);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2012-08-10 12:39:41 UTC (rev 4511)
@@ -72,7 +72,7 @@
{
TwoPhaseCommitTest* const test;
const string messageId;
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_),
messageId(messageId_) {}
void init(){ msg = test->deliver(messageId, test->queueA); }
@@ -83,9 +83,9 @@
class Enqueue : public Strategy
{
TwoPhaseCommitTest* const test;
- boost::intrusive_ptr<Message> msg1;
- boost::intrusive_ptr<Message> msg2;
- boost::intrusive_ptr<Message> msg3;
+ Message msg1;
+ Message msg2;
+ Message msg3;
public:
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
@@ -107,9 +107,9 @@
class Dequeue : public Strategy
{
TwoPhaseCommitTest* const test;
- boost::intrusive_ptr<Message> msg1;
- boost::intrusive_ptr<Message> msg2;
- boost::intrusive_ptr<Message> msg3;
+ Message msg1;
+ Message msg2;
+ Message msg3;
public:
Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {
@@ -135,8 +135,8 @@
class MultiQueueTxn : public Strategy
{
TwoPhaseCommitTest* const test;
- boost::intrusive_ptr<Message> msg1;
- boost::intrusive_ptr<Message> msg2;
+ Message msg1;
+ Message msg2;
std::set<Queue::shared_ptr> queueset;
public:
MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
@@ -208,9 +208,10 @@
std::auto_ptr<LinkRegistry> links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
- boost::intrusive_ptr<Message> msg1;
- boost::intrusive_ptr<Message> msg2;
- boost::intrusive_ptr<Message> msg4;
+ Message msg1;
+ Message msg2;
+ Message msg4;
+ std::auto_ptr<TxBuffer> tx;
void recoverPrepared(bool commit)
{
@@ -220,6 +221,11 @@
swap.init();
std::auto_ptr<TPCTransactionContext>
txn(store->begin("my-xid"));
swap.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
+
store->prepare(*txn);
restart<MessageStoreImpl>();
@@ -246,6 +252,10 @@
mqtTest.init();
std::auto_ptr<TPCTransactionContext>
txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
mqtTest.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
store->prepare(*txn);
// As the commits and aborts should happen through DtxManager, and it is too
complex to
@@ -268,6 +278,10 @@
std::auto_ptr<TPCTransactionContext>
txn(store->begin("my-xid"));
strategy.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
store->prepare(*txn);
store->commit(*txn);
restart<MessageStoreImpl>();
@@ -281,6 +295,10 @@
std::auto_ptr<TPCTransactionContext>
txn(store->begin("my-xid"));
strategy.run(txn.get());
+ if (tx.get()) {
+ tx->prepare(txn.get());
+ tx.reset();
+ }
if (prepare) store->prepare(*txn);
store->abort(*txn);
restart<MessageStoreImpl>();
@@ -289,40 +307,45 @@
void swap(TPCTransactionContext* txn, Queue::shared_ptr& from,
Queue::shared_ptr& to)
{
- QueuedMessage msg1 = from->get();//just dequeues in memory
+ QueueCursor c;
+ Message msg1 = MessageUtils::get(*from, &c);//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- to->enqueue(txn, msg1.payload);//note: need to enqueue it first to avoid
message being deleted
- from->dequeue(txn, msg1);
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ to->deliver(msg1, tx.get());//note: need to enqueue it first to avoid message
being deleted
+ from->dequeue(txn, c);
}
void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
{
- QueuedMessage msg2 = queue->get();//just dequeues in memory
- queue->dequeue(txn, msg2);
+ QueueCursor c;
+ Message msg2 = MessageUtils::get(*queue, &c);//just dequeues in memory
+ queue->dequeue(txn, c);
}
- boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const
string& msgid, Queue::shared_ptr& queue)
+ Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid,
Queue::shared_ptr& queue)
{
- boost::intrusive_ptr<Message> msg = createMessage(msgid);
- queue->enqueue(txn, msg);
- return msg;
+ Message msg = createMessage(msgid);
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ queue->deliver(msg, tx.get());
+ return msg;
}
- boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const
string& msgid, std::set<Queue::shared_ptr>& queueset)
+ Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid,
std::set<Queue::shared_ptr>& queueset)
{
- boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+ Message msg = createMessage(msgid);
for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i !=
queueset.end(); i++) {
- (*i)->enqueue(txn, msg);
+ (*i)->deliver(msg, tx.get());
}
- return msg;
+ return msg;
}
- boost::intrusive_ptr<Message> deliver(const string& msgid,
Queue::shared_ptr& queue)
+ Message deliver(const string& msgid, Queue::shared_ptr& queue)
{
- msg4 = createMessage(msgid);
- queue->deliver(msg4);
- return msg4;
+ Message m = createMessage(msgid);
+ queue->deliver(m);
+ return m;
}
template <class T>
@@ -332,16 +355,15 @@
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
- FieldTable settings;
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
- queueA->create(settings);
+ queueA->create();
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
- queueB->create(settings);
+ queueB->create();
}
- boost::intrusive_ptr<Message> createMessage(const string& id, const
string& exchange="exchange", const string& key="routing_key")
+ Message createMessage(const string& id, const string&
exchange="exchange", const string& key="routing_key")
{
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange,
key, Uuid(), true, 0, id);
+ Message msg = MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
return msg;
}
@@ -374,9 +396,9 @@
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL(size, queue->getMessageCount());
if (size > 0) {
- boost::intrusive_ptr<Message> msg = queue->get().payload;
+ Message msg = MessageUtils::get(*queue);
BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid,
msg->getProperties<MessageProperties>()->getCorrelationId());
+ BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
}
}
Modified: store/trunk/cpp/tests/python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/python_tests/__init__.py 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/python_tests/__init__.py 2012-08-10 12:39:41 UTC (rev 4511)
@@ -22,5 +22,4 @@
# The GNU Lesser General Public License is available in the file COPYING.
from client_persistence import *
-from flow_to_disk import *
from resize import *
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2012-05-29 15:46:20 UTC (rev 4510)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2012-08-10 12:39:41 UTC (rev 4511)
@@ -1,1218 +0,0 @@
-"""
-Copyright (c) 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-import qpid
-from brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
-from qpid.datatypes import uuid4
-from store_test import StoreTest, store_args
-from qpid.messaging import Message, TargetCapacityExceeded, ServerError #SessionError,
SendError
-
-class FlowToDisk(StoreTest):
- """Tests for async store flow-to-disk"""
-
- def _broker_args(self):
- """ Disable flow control so we can easily overflow a queue!
- """
- fc_off = ["--default-flow-stop-threshold", "0",
- "--default-flow-resume-threshold", "0"]
- return store_args() + fc_off;
-
-
- @staticmethod
- def _broker_name(queue_name, txn_produce, txn_consume):
- """Create a broker name based on the queue name and the
transaction parameters"""
- name = queue_name
- if txn_produce:
- name += "_TxP"
- if txn_consume:
- name += "_TxC"
- return name
-
- def _tx_simple_limit(self, queue_name, kwargs):
- """
- Test a simple case of message limits which will force flow-to-disk.
- * queue_args sets a limit - either max_count and/or max_size
- * messages are added. Some will flow to disk.
- * Consume all messages sent.
- * Check the broker has no messages left.
- """
- # Unpack args
- txn_produce = kwargs.get("txn_produce", False)
- txn_consume = kwargs.get("txn_consume", False)
- recover = kwargs.get("recover", False)
- max_count = kwargs.get("max_count")
- max_size = kwargs.get("max_size")
- policy = kwargs.get("policy", "flow_to_disk")
- num_msgs = kwargs.get("num_msgs", 15)
- msg_size = kwargs.get("msg_size", 10)
- msg_durable = kwargs.get("msg_durable", False)
- sync = kwargs.get("sync", False)
- browse = kwargs.get("browse", False)
-
- bname = self._broker_name(queue_name, txn_produce, txn_consume)
- if recover:
- expect = EXPECT_UNKNOWN
- else:
- expect = EXPECT_EXIT_OK
- broker = self.broker(self._broker_args(), name=bname, expect=expect,
log_level="debug+")
- prod_session = broker.connect().session(transactional=txn_produce)
- sender = prod_session.sender(self.snd_addr(queue_name, auto_create=True,
durable=True, ftd_count=max_count,
- ftd_size=max_size, policy=policy))
-
- # Send messages
- msgs = []
- pre_recover_ftd_msgs = [] # msgs released before a recover
- post_recover_ftd_msgs = [] # msgs released after a recover
- cum_msg_size = 0
- for index in range(0, num_msgs):
- msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
- correlation_id="msg-%04d"%index)
- #print "Sending msg %s" % msg.id
- msgs.append(msg)
- cum_msg_size += msg_size
- if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
- pre_recover_ftd_msgs.append(msg)
- sender.send(msg, sync=sync)
- if not sync:
- sender.sync()
- # Close transaction (if needed)
- if txn_produce:
- prod_session.commit()
-
- # Browse messages
- if browse:
- self.check_messages(broker, queue_name, msgs, browse=True)
-
- if recover:
- broker.terminate()
- if msg_durable:
- post_recover_ftd_msgs = pre_recover_ftd_msgs
- else:
- del msgs[:] # Transient messages will be discarded on recover
- old_broker = broker # keep for log analysis
- broker = self.broker(self._broker_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
-
- # Browse messages after recover
- if browse:
- self.check_messages(broker, queue_name, msgs, browse=True)
-
- # Consume messages
- self.check_messages(broker, queue_name, msgs, transactional=txn_consume,
empty=True)
- broker.terminate()
-
- # Check broker logs for released messages
- if recover:
- if txn_produce:
- self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_release(old_broker, pre_recover_ftd_msgs)
- self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
- else:
- if txn_produce:
- self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_release(broker, pre_recover_ftd_msgs)
-
- def simple_limit(self, queue_name, **kwargs):
- """Adapter for adding transactions to test"""
- # Cycle through the produce/consume block transaction combinations
- for index in range(0, 4):
- kwargs["txn_produce"] = index & 1 != 0 # Transactional produce
- kwargs["txn_consume"] = index & 2 != 0 # Transactional consume
- self._tx_simple_limit(queue_name, kwargs)
-
-class SimpleMaxCountTest(FlowToDisk):
- """Flow-to-disk tests based on setting max_count"""
-
- def test_base(self):
- """Base test"""
- self.simple_limit("SimpleMaxCount", max_count=10)
-
- def test_recover(self):
- """Recover test"""
- self.simple_limit("SimpleMaxCountRecover", max_count=10, recover=True)
-
- def test_durable(self):
- """Durable message test"""
- self.simple_limit("SimpleMaxCountDurable", max_count=10,
msg_durable=True)
-
- def test_durable_recover(self):
- """Durable message recover test"""
- self.simple_limit("SimpleMaxCountDurableRecover", max_count=10,
msg_durable=True, recover=True)
-
- def test_browse(self):
- """Browse test"""
- self.simple_limit("SimpleMaxCountBrowse", max_count=10, browse=True)
-
- def test_browse_recover(self):
- """Browse before and after recover test"""
- self.simple_limit("SimpleMaxCountBrowseRecover", max_count=10,
browse=True, recover=True)
-
- def test_durable_browse(self):
- """Browse durable message test"""
- self.simple_limit("SimpleMaxCountDurableBrowse", max_count=10,
msg_durable=True, browse=True)
-
- def test_durable_browse_recover(self):
- """Browse durable messages before and after
recover"""
- self.simple_limit("SimpleMaxCountDurableBrowseRecover", max_count=10,
msg_durable=True, browse=True,
- recover=True)
-
- def test_large_msg(self):
- """Large message test"""
- self.simple_limit("SimpleMaxCountLargeMsg", max_count=10,
max_size=10000000, num_msgs=100, msg_size=10000)
-
- def test_large_msg_recover(self):
- """Large message test"""
- self.simple_limit("SimpleMaxCountLargeMsgRecover", max_count=10,
max_size=10000000, num_msgs=100,
- msg_size=10000, recover=True)
-
- def test_large_msg_durable(self):
- """Large durable message test"""
- self.simple_limit("SimpleMaxCountLargeMsgDurable", max_count=10,
max_size=10000000, msg_durable=True,
- num_msgs=100, msg_size=10000)
-
- def test_large_msg_durable_recover(self):
- """Large durable message test"""
- self.simple_limit("SimpleMaxCountLargeMsgDurableRecover", max_count=10,
max_size=10000000, msg_durable=True,
- num_msgs=100, msg_size=10000, recover=True)
-
- def test_large_msg_browse(self):
- """Large message browse test"""
- self.simple_limit("SimpleMaxCountLargeMsgBrowse", max_count=10,
max_size=10000000, browse=True, num_msgs=100,
- msg_size=10000)
-
- def test_large_msg_browse_recover(self):
- """Large message browse test"""
- self.simple_limit("SimpleMaxCountLargeMsgBrowseRecover", max_count=10,
max_size=10000000, browse=True,
- num_msgs=100, msg_size=10000, recover=True)
-
- def test_large_msg_durable_browse(self):
- """Large durable message browse test"""
- self.simple_limit("SimpleMaxCountLargeMsgDurableBrowse", max_count=10,
max_size=10000000, msg_durable=True,
- browse=True, num_msgs=100, msg_size=10000)
-
- def test_large_msg_durable_browse_recover(self):
- """Large durable message browse test"""
- self.simple_limit("SimpleMaxCountLargeMsgDurableBrowseRecover",
max_count=10, max_size=10000000,
- msg_durable=True, browse=True, num_msgs=100, msg_size=10000,
recover=True)
-
-class SimpleMaxSizeTest(FlowToDisk):
- """Flow-to-disk tests based on setting max_size"""
-
- def test_base(self):
- """Base test"""
- self.simple_limit("SimpleMaxSize", max_size=100)
-
- def test_recover(self):
- """Recover test"""
- self.simple_limit("SimpleMaxSizeRecover", max_size=100, recover=True)
-
- def test_durable(self):
- """Durable message test"""
- self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True)
-
- def test_durable_recover(self):
- """Durable message recover test"""
- self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True, recover=True)
-
- def test_browse(self):
- """Browse test"""
- self.simple_limit("SimpleMaxSizeBrowse", max_size=100, browse=True)
-
- def test_browse_recover(self):
- """Browse before and after recover test"""
- self.simple_limit("SimpleMaxSizeBrowseRecover", max_size=100,
browse=True, recover=True)
-
- def test_durable_browse(self):
- """Browse durable message test"""
- self.simple_limit("SimpleMaxSizeDurableBrowse", max_size=100,
msg_durable=True, browse=True)
-
- def test_durable_browse_recover(self):
- """Browse durable messages before and after
recover"""
- self.simple_limit("SimpleMaxSizeDurableBrowseRecover", max_size=100,
msg_durable=True, browse=True,
- recover=True)
-
- def test_large_msg(self):
- """Large message test"""
- self.simple_limit("SimpleMaxSizeLargeMsg", max_size=100000,
num_msgs=100, msg_size=10000)
-
- def test_large_msg_recover(self):
- """Large message test"""
- self.simple_limit("SimpleMaxSizeLargeMsgRecover", max_size=100000,
num_msgs=100, msg_size=10000, recover=True)
-
- def test_large_msg_durable(self):
- """Large durable message test"""
- self.simple_limit("SimpleMaxSizeLargeMsgDurable", max_size=100000,
msg_durable=True, num_msgs=100,
- msg_size=10000)
-
- def test_large_msg_durable_recover(self):
- """Large durable message test"""
- self.simple_limit("SimpleMaxSizeLargeMsgDurableRecover",
max_size=100000, msg_durable=True, num_msgs=100,
- msg_size=10000, recover=True)
-
- def test_large_msg_browse(self):
- """Large message browse test"""
- self.simple_limit("SimpleMaxSizeLargeMsgBrowse", max_size=100,
browse=True, num_msgs=100, msg_size=10000)
-
- def test_large_msg_browse_recover(self):
- """Large message browse test"""
- self.simple_limit("SimpleMaxSizeLargeMsgBrowseRecover", max_size=100,
browse=True, num_msgs=100, msg_size=10000,
- recover=True)
-
- def test_large_msg_durable_browse(self):
- """Large durable message browse test"""
- self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowse", max_size=100,
msg_durable=True, browse=True,
- num_msgs=100, msg_size=10000)
-
- def test_large_msg_durable_browse_recover(self):
- """Large durable message browse test"""
- self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowseRecover",
max_size=100, msg_durable=True, browse=True,
- num_msgs=100, msg_size=10000, recover=True)
-
-class SimpleMaxSizeCountTest(FlowToDisk):
- """Flow-to-disk tests based on setting both max_count and max_size at
the same time"""
-
- def test_base(self):
- """Base test"""
- self.simple_limit("MaxSizeMaxCount", max_count=10, max_size=1000)
-
- def test_recover(self):
- """Recover test"""
- self.simple_limit("MaxSizeMaxCountRecover", max_count=10,
max_size=1000, recover=True)
-
- def test_durable(self):
- """Durable message test"""
- self.simple_limit("MaxSizeMaxCountDurable", max_count=10,
max_size=1000, msg_size=250, msg_durable=True)
-
- def test_durable_recover(self):
- """Durable message recover test"""
- self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10,
max_size=1000, msg_size=250, msg_durable=True,
- recover=True)
-
- def test_browse(self):
- """Browse test"""
- self.simple_limit("MaxSizeMaxCountBrowse", max_count=10, max_size=1000,
browse=True)
-
- def test_browse_recover(self):
- """Browse before and after recover test"""
- self.simple_limit("MaxSizeMaxCountBrowseRecover", max_count=10,
max_size=1000, browse=True, recover=True)
-
- def test_durable_browse(self):
- """Browse durable message test"""
- self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10,
max_size=1000, msg_size=250, msg_durable=True,
- browse=True)
-
- def test_durable_browse_recover(self):
- """Browse durable messages before and after
recover"""
- self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10,
max_size=1000, msg_size=250,
- msg_durable=True, browse=True, recover=True)
-
-#
======================================================================================================================
-
-class MultiQueueFlowToDisk(FlowToDisk):
- """Tests for async store flow-to-disk involving multiple
queues"""
-
- def _multi_queue_setup(self, queue_map, broker, exchange_name, txn_produce,
txn_consume, policy, exclusive = False):
- """Create the send session and receive sessions for multi-queue
scenarios"""
- connection = broker.connect()
- snd_session = connection.session(transactional=txn_produce)
- addr = self.snd_addr(exchange_name, topic_flag=True,
exchage_type="fanout")
- #print "snd_addr=\"%s\"" % addr
- sndr = snd_session.sender(addr)
- for queue_name, queue_properties in queue_map.iteritems():
- if "durable" in queue_properties.keys():
- durable = queue_properties["durable"]
- else:
- durable = False
- max_count = None
- if "max_count" in queue_properties.keys():
- max_count = queue_properties["max_count"]
- max_size = None
- if "max_size" in queue_properties.keys():
- max_size = queue_properties["max_size"]
- rcv_session = connection.session(transactional=txn_consume)
- addr = self.rcv_addr(exchange_name, auto_create=False, link_name=queue_name,
durable=durable,
- exclusive=exclusive, ftd_count=max_count,
ftd_size=max_size, policy=policy)
- #print "rcv_addr=\"%s\"" % addr
- rcv_session.receiver(addr)
- return snd_session, sndr
-
- @staticmethod
- def _make_policy_dict(src, marker, delim=";"):
- """Create a dictionary of key/value strings from a formatted
string src of the form
- '... marker key1=val1, key2=val2, ..., keyN=valN delimiter ...'
- where the portion of interest starts at marker m until the following delimiter d
(default: ';')."""
- pos = src.find(marker) + len(marker)
- res = []
- for index in src[pos:src.find(delim, pos)].split():
- if "=" in index:
- res.append(index.strip(",").split("="))
- if len(res) > 0:
- return dict(res)
-
- @staticmethod
- def _make_policy_val(src, marker, delim=";"):
- """Return a string value from a formatted string of the form
'... marker val delimiter ...' where the value
- lies between marker and delimiter d (default: ';')"""
- pos = src.find(marker) + len(marker)
- return src[pos:src.find(delim, pos)].strip()
-
- @staticmethod
- def _check_error(error_str, fail_list=None):
- """Check a policy exception string to ensure the failure occurred
on the expected queue and at the expected
- count."""
- if error_str.startswith("resource-limit-exceeded"):
- fail_policy = MultiQueueFlowToDisk._make_policy_val(error_str,
"type=", delim="(")
- fail_queue_name = MultiQueueFlowToDisk._make_policy_val(error_str,
"Policy exceeded on ", delim=",")
- fail_count_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"count: ")
- fail_size_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"size: ")
- if fail_list == None:
- return False # Not expected - no failure should have occurred
- for fail in fail_list:
- if fail_queue_name == fail["queue"]:
- if fail_policy != fail["type"]:
- return False
- if (fail_count_dict != None and "count" in fail and \
- int(fail_count_dict["current"]) !=
fail["count"]) \
- or \
- (fail_size_dict != None and "size" in fail and
int(fail_size_dict["current"]) != fail["size"]):
- return False
- return True
- return False
-
- @staticmethod
- def _check_target_capacity_exceeded_error(err, fail_list=None):
- """Check that an error is a
TargetCapacityExceeded."""
- if not isinstance(err, TargetCapacityExceeded):
- return False
- return MultiQueueFlowToDisk._check_error(str(err), fail_list)
-
- @staticmethod
- def _check_server_error(err, txn=False):
- """Check that an error is a ServerError."""
- if not isinstance(err, ServerError):
- return False
- if txn and str(err).startswith("internal-error: Commit failed"):
- return True
- return False
-
- @staticmethod
- def _is_queue_durable(queue_map, index):
- """Return true if the indexed queue is durable (indexed by
queue_map.keys() or queue_map.values())"""
- return "durable" in queue_map.values()[index] and
queue_map.values()[index]["durable"]
-
- @staticmethod
- def _expected_msg_loss(fail_list):
- """Examine the fail_list for expected failures and return a tuple
containing the expected failure conditions"""
- count_exp_loss = None
- count_exp_loss_queues = None
- size_exp_loss = None
- size_exp_loss_queues = None
- if fail_list != None:
- for fail in fail_list:
- if "count" in fail:
- this_count = fail["count"]
- if count_exp_loss == None:
- count_exp_loss = this_count
- count_exp_loss_queues = [fail["queue"]]
- elif this_count < count_exp_loss:
- count_exp_loss = this_count
- count_exp_loss_queues = [fail["queue"]]
- elif this_count == count_exp_loss:
- count_exp_loss_queues.append(fail["queue"])
- if "size" in fail:
- this_size = fail["size"]
- if size_exp_loss == None:
- size_exp_loss = this_size
- size_exp_loss_queues = [fail["queue"]]
- elif this_size < size_exp_loss:
- size_exp_loss = this_size
- size_exp_loss_queues = [fail["queue"]]
- elif this_size == size_exp_loss:
- size_exp_loss_queues.append(fail["queue"])
- return (count_exp_loss, count_exp_loss_queues, size_exp_loss,
size_exp_loss_queues)
-
- @staticmethod
- def _expected_msg_ftd(queue_map):
- max_count = None
- max_size = None
- for queue_props in queue_map.itervalues():
- if "durable" in queue_props and queue_props["durable"]:
- if "max_count" in queue_props and
queue_props["max_count"] != None and \
- (max_count == None or queue_props["max_count"] <
max_count):
- max_count = queue_props["max_count"]
- if "max_size" in queue_props and
queue_props["max_size"] != None and \
- (max_size == None or queue_props["max_size"] <
max_size):
- max_size = queue_props["max_size"]
- return (max_count, max_size)
-
-
- def tx_multi_queue_limit(self, broker_base_name, queue_map, exchange_name,
**kwargs):
- """ Test a multi-queue case
- queue_map = queue map where map is queue name (key) against queue args (value)
- """
- # Unpack args
- msg_durable = kwargs.get("msg_durable", False)
- num_msgs = kwargs.get("num_msgs", 15)
- msg_size = kwargs.get("msg_size", 10)
- txn_produce = kwargs.get("txn_produce", False)
- txn_consume = kwargs.get("txn_consume", False)
- browse = kwargs.get("browse", False)
- policy = kwargs.get("policy", "flow_to_disk")
- recover = kwargs.get("recover", False)
- sync = kwargs.get("sync", False)
- fail_list = kwargs.get("fail_list")
-
- bname = self._broker_name(broker_base_name, txn_produce, txn_consume)
- broker = self.broker(self._broker_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
- snd_session, sndr = self._multi_queue_setup(queue_map, broker, exchange_name,
txn_produce, txn_consume, policy)
-
- # Find expected limits
- count_exp_loss, count_exp_loss_queues, size_exp_loss, size_exp_loss_queues =
self._expected_msg_loss(fail_list)
- max_count, max_size = self._expected_msg_ftd(queue_map)
-
- # Send messages
- try:
- msgs = []
- pre_recover_ftd_msgs = [] # msgs released before a recover
- post_recover_ftd_msgs = [] # msgs released after a recover
- cum_msg_size = 0
- target_queues = []
- for index in range(0, num_msgs):
- msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
- correlation_id="msg-%04d"%index)
- #print "Sending msg %s" % msg.id
- sndr.send(msg, sync=sync)
- if msg_size != None:
- cum_msg_size += msg_size
- if count_exp_loss != None and index >= count_exp_loss:
- target_queues.extend(count_exp_loss_queues)
- if size_exp_loss != None and cum_msg_size > size_exp_loss:
- target_queues.extend(size_exp_loss_queues)
- if (count_exp_loss == None or index < count_exp_loss) and \
- (size_exp_loss == None or cum_msg_size <= size_exp_loss):
- msgs.append(msg)
- if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
- pre_recover_ftd_msgs.append(msg)
- if not sync:
- sndr.sync()
- if txn_produce:
- snd_session.commit()
- except TargetCapacityExceeded, err:
- if not self._check_target_capacity_exceeded_error(err, fail_list):
- raise
- except ServerError, err:
- msgs[:] = [] # Transaction failed, all messages lost
- if not self._check_server_error(err, txn_produce):
- raise
-
- # Browse messages
- if browse:
- for index in range(0, len(queue_map)):
- self.check_messages(broker, queue_map.keys()[index], msgs, browse=True)
-
- if recover:
- broker.terminate()
- if msg_durable:
- post_recover_ftd_msgs = pre_recover_ftd_msgs
- else:
- del msgs[:] # Transient messages will be discarded on recover
- old_broker = broker # keep for log analysis
- broker = self.broker(self._broker_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
- # Browse messages
- if browse:
- for index in range(0, len(queue_map)):
- empty = not self._is_queue_durable(queue_map, index)
- self.check_messages(broker, queue_map.keys()[index], msgs,
browse=True, emtpy_flag=empty)
-
- # Consume messages
- for index in range(0, len(queue_map)):
- empty_chk = txn_produce or queue_map.keys()[index] in target_queues
- empty = recover and not self._is_queue_durable(queue_map, index)
- self.check_messages(broker, queue_map.keys()[index], msgs,
transactional=txn_consume, empty=empty_chk,
- emtpy_flag=empty)
-
- broker.terminate()
-
- # Check broker logs for released messages
- if recover:
- if txn_produce:
- if msg_durable:
- self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_block_on_commit(old_broker, pre_recover_ftd_msgs)
- else:
- if msg_durable:
- self.check_msg_release(old_broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_block(old_broker, pre_recover_ftd_msgs)
- self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
- else:
- if txn_produce:
- if msg_durable:
- self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_block_on_commit(broker, pre_recover_ftd_msgs)
- else:
- if msg_durable:
- self.check_msg_release(broker, pre_recover_ftd_msgs)
- else:
- self.check_msg_block(broker, pre_recover_ftd_msgs)
-
- # --- Parameterized test methods ---
-
- def no_limit(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """No policy test"""
- queue_map_1 = {"a%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
- "b%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None} }
- self.tx_multi_queue_limit("MultiQueue_NoLimit", queue_map_1,
exchange_name="Fanout_a%02d" % num,
- msg_durable=msg_durable, browse=browse,
recover=recover, txn_produce=txn_produce,
- txn_consume=txn_consume)
-
- def max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """Count policy test"""
- queue_map_2 = {"c%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
- "d%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"d%02d" % num,
"type":"reject", "count":10}]
- self.tx_multi_queue_limit("MultiQueue_MaxCount", queue_map_2,
exchange_name="Fanout_b%02d" % num,
- msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
- txn_produce=txn_produce, txn_consume=txn_consume)
-
- def max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """Size policy test"""
- queue_map_3 = {"e%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
- "f%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"f%02d" % num,
"type":"reject", "size":1000}]
- self.tx_multi_queue_limit("MultiQueue_MaxSize", queue_map_3,
exchange_name="Fanout_c%02d" % num, msg_size=100,
- msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
- txn_produce=txn_produce, txn_consume=txn_consume)
-
- def dual_max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False,
- txn_produce=False, txn_consume=False):
- """Multiple count policy test"""
- queue_map_4 = {"g%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
- "h%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": None} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"h%02d" % num,
"type":"reject", "count":8}]
- self.tx_multi_queue_limit("MultiQueue_DualMaxCount", queue_map_4,
exchange_name="Fanout_d%02d" % num,
- msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
- txn_produce=txn_produce, txn_consume=txn_consume)
-
- def dual_max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """Multiple size policy test"""
- queue_map_5 = {"i%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
- "j%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"j%02d" % num,
"type":"reject", "size":800}]
- self.tx_multi_queue_limit("MultiQueue_DualMaxSize", queue_map_5,
exchange_name="Fanout_e%02d" % num,
- msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
- fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
-
- def mixed_limit_1(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """Both count and size polices active with the same queue having
equal probabilities of triggering the
- policy"""
- queue_map_6 = {"k%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
- "l%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
- "m%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
- "n%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 800} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"n%02d" % num,
"type":"reject", "count":8, "size":800}]
- self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_6,
exchange_name="Fanout_f%02d" % num,
- msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
- fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
-
- def mixed_limit_2(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
- txn_consume=False):
- """Both count and size polices active with different queues having
equal probabilities of triggering the
- policy"""
- queue_map_7 = {"o%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
- "p%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
- "q%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800},
- "r%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 1000} }
- fail_list = None
- if not queue_durable:
- fail_list = [{"queue":"q%02d" % num,
"type":"reject", "size":800},
- {"queue":"r%02d" % num,
"type":"reject", "count":8,}]
- self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_7,
exchange_name="Fanout_g%02d" % num,
- msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
- fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
-
- # --- Non-parameterized test methods - these will be run by Python test framework
---
-
- _num = None
- _queue_durable = False
- _msg_durable = False
- _browse = False
- _recover = False
- _txn_produce = False
- _txn_consume = False
-
- def test_no_limit(self):
- """No policy test (non-parameterized)"""
- self.no_limit(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
- recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
-
- def test_max_count(self):
- """Count policy test (non-parameterized)"""
- self.max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
- recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
-
- def test_max_size(self):
- """Size policy test (non-parameterized)"""
- self.max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
- recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
-
- def test_dual_max_count(self):
- """Multiple count policy test
(non-parameterized)"""
- self.dual_max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
- browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
- txn_consume=self._txn_consume)
-
- def test_dual_max_size(self):
- """Multiple size policy test
(non-parameterized)"""
- self.dual_max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
- browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
- txn_consume=self._txn_consume)
-
- def test_mixed_limit_1(self):
- """Both count and size polices active with the same queue having
equal probabilities of triggering the
- policy (non-parameterized)"""
- self.mixed_limit_1(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
- browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
- txn_consume=self._txn_consume)
-
- def test_mixed_limit_2(self):
- """Both count and size polices active with different queues having
equal probabilities of triggering the
- policy (non-parameterized)"""
- self.mixed_limit_2(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
- browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
- txn_consume=self._txn_consume)
-
-# --- Tests ---
-
-class MultiQueueTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient
queues"""
- _num = 1
-
-class MultiDurableQueueTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable
queues"""
- _num = 2
- _queue_durable = True
-
-class MultiQueueDurableMsgTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient
queues"""
- _num = 3
- _msg_durable = True
-
-class MultiDurableQueueDurableMsgTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues"""
- _num = 4
- _queue_durable = True
- _msg_durable = True
-
-class MultiQueueBrowseTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues with messages
browsed before being consumed"""
- _num = 5
- _browse = True
-
-class MultiDurableQueueBrowseTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues with messages
browsed before being consumed"""
- _num = 6
- _queue_durable = True
- _browse = True
-
-class MultiQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues with messages
browsed before being consumed"""
- _num = 7
- _msg_durable = True
- _browse = True
-
-class MultiDurableQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues with messages
browsed before being consumed"""
- _num = 8
- _queue_durable = True
- _msg_durable = True
- _browse = True
-
-class MultiQueueRecoverTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues and broker
terminated/recovered"""
- _num = 9
- _recover = True
-
-class MultiDurableQueueRecoverTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues and broker
terminated/recovered"""
- _num = 10
- _queue_durable = True
- _recover = True
-
-class MultiQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues and broker
terminated/recovered"""
- _num = 11
- _msg_durable = True
- _recover = True
-
-class MultiDurableQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues and broker
terminated/recovered"""
- _num = 12
- _queue_durable = True
- _msg_durable = True
- _recover = True
-
-class MultiQueueBrowseRecoverTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
- are consumed"""
- _num = 13
- _browse = True
- _recover = True
-
-class MultiDurableQueueBrowseRecoverTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
- are consumed"""
- _num = 14
- _queue_durable = True
- _browse = True
- _recover = True
-
-class MultiQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
- are consumed"""
- _num = 15
- _msg_durable = True
- _browse = True
- _recover = True
-
-class MultiDurableQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and are
- consumed"""
- _num = 16
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _recover = True
-
-class MultiQueueTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce"""
- _num = 17
- _txn_produce = True
-
-class MultiDurableQueueTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce"""
- _num = 18
- _queue_durable = True
- _txn_produce = True
-
-class MultiQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce"""
- _num = 19
- _msg_durable = True
- _txn_produce = True
-
-class MultiDurableQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce"""
- _num = 20
- _queue_durable = True
- _msg_durable = True
- _txn_produce = True
-
-class MultiQueueBrowseTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
- being consumed"""
- _num = 21
- _browse = True
- _txn_produce = True
-
-class MultiDurableQueueBrowseTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
- being consumed"""
- _num = 22
- _queue_durable = True
- _browse = True
- _txn_produce = True
-
-class MultiQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
- being consumed"""
- _num = 23
- _msg_durable = True
- _browse = True
- _txn_produce = True
-
-class MultiDurableQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before being
- consumed"""
- _num = 24
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _txn_produce = True
-
-class MultiQueueRecoverTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce and broker
- terminated/recovered"""
- _num = 25
- _recover = True
- _txn_produce = True
-
-class MultiDurableQueueRecoverTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce and broker terminated/recovered"""
- _num = 26
- _queue_durable = True
- _recover = True
- _txn_produce = True
-
-class MultiQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce and broker terminated/recovered"""
- _num = 27
- _msg_durable = True
- _recover = True
- _txn_produce = True
-
-class MultiDurableQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce and broker terminated/recovered"""
- _num = 28
- _queue_durable = True
- _msg_durable = True
- _recover = True
- _txn_produce = True
-
-class MultiQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed"""
- _num = 29
- _browse = True
- _recover = True
- _txn_produce = True
-
-class MultiDurableQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed"""
- _num = 30
- _queue_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
-
-class MultiQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed"""
- _num = 31
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
-
-class MultiDurableQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before broker
- terminated/recovered and are consumed"""
- _num = 32
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
-
-class MultiQueueTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues and consumed
transactionally"""
- _num = 33
- _txn_consume = True
-
-class MultiDurableQueueTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues and consumed
transactionally"""
- _num = 34
- _queue_durable = True
- _txn_consume = True
-
-class MultiQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues and consumed
transactionally"""
- _num = 35
- _msg_durable = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues and consumed
transactionally"""
- _num = 36
- _queue_durable = True
- _msg_durable = True
- _txn_consume = True
-
-class MultiQueueBrowseTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues with messages
browsed before being consumed
- transactionally"""
- _num = 37
- _browse = True
- _txn_consume = True
-
-class MultiDurableQueueBrowseTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues with messages
browsed before being consumed transactionally"""
- _num = 38
- _queue_durable = True
- _browse = True
- _txn_consume = True
-
-class MultiQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues with messages
browsed before being consumed transactionally"""
- _num = 39
- _msg_durable = True
- _browse = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues with messages
browsed before being consumed transactionally"""
- _num = 40
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _txn_consume = True
-
-class MultiQueueRecoverTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues and broker
terminated/recovered before being consumed
- transactionally"""
- _num = 41
- _recover = True
- _txn_consume = True
-
-class MultiDurableQueueRecoverTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues and broker
terminated/recovered before being consumed
- transactionally"""
- _num = 42
- _queue_durable = True
- _recover = True
- _txn_consume = True
-
-class MultiQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues and broker
terminated/recovered before being consumed
- transactionally"""
- _num = 43
- _msg_durable = True
- _recover = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues and broker
terminated/recovered before being consumed
- transactionally"""
- _num = 44
- _queue_durable = True
- _msg_durable = True
- _recover = True
- _txn_consume = True
-
-class MultiQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
- are consumed transactionally"""
- _num = 45
- _browse = True
- _recover = True
- _txn_consume = True
-
-class MultiDurableQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
- are consumed transactionally"""
- _num = 46
- _queue_durable = True
- _browse = True
- _recover = True
- _txn_consume = True
-
-class MultiQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
- are consumed transactionally"""
- _num = 47
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
- are consumed transactionally"""
- _num = 48
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_consume = True
-
-class MultiQueueTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce and are consumed
- transactionally"""
- _num = 49
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce and are consumed
- transactionally"""
- _num = 50
- _queue_durable = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce and are consumed
- transactionally"""
- _num = 51
- _msg_durable = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce and are consumed
- transactionally"""
- _num = 52
- _queue_durable = True
- _msg_durable = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
- being consumed transactionally"""
- _num = 53
- _browse = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
- being consumed transactionally"""
- _num = 54
- _queue_durable = True
- _browse = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
- being consumed transactionally"""
- _num = 55
- _msg_durable = True
- _browse = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before being
- consumed transactionally"""
- _num = 56
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce and broker
- terminated/recovered before they are consumed transactionally"""
- _num = 57
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce and broker terminated/recovered
- before they are consumed transactionally"""
- _num = 58
- _queue_durable = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce and broker terminated/recovered
- before they are consumed transactionally"""
- _num = 59
- _msg_durable = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce and broker terminated/recovered
- before they are consumed transactionally"""
- _num = 60
- _queue_durable = True
- _msg_durable = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed transactionally"""
- _num = 61
- _browse = True
- _recover = True
- _txn_produce = True
-
-class MultiDurableQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed transactionally"""
- _num = 62
- _queue_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
- broker terminated/recovered and are consumed transactionally"""
- _num = 63
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
-class MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
- """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before broker
- terminated/recovered and are consumed transactionally"""
- _num = 64
- _queue_durable = True
- _msg_durable = True
- _browse = True
- _recover = True
- _txn_produce = True
- _txn_consume = True
-
- # --- Long and randomized tests ---
-
-# def test_12_Randomized(self):
-# """Randomized flow-to-disk tests"""
-# seed = long(1000.0 * time.time())
-# print "seed=0x%x" % seed
-# random.seed(seed)
-# for index in range(0, 10):
-# self.randomLimit(index)