Author: kpvdr
Date: 2009-09-11 09:35:04 -0400 (Fri, 11 Sep 2009)
New Revision: 3622
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
Joint checkin with cctrieloff to correct mulit-queue flow-to-disk behavior. Syncs with
qpid r.813825.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -252,6 +252,8 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
+// Return true if content is recovered from store; false if content is external and must
be recovered from an external store.
+// Throw exception for all errors.
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t
offset)
{
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -1474,15 +1474,15 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
if (message->isContentReleased()) {
- jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ jc->enqueue_extern_data_record(size, dtokp.get(),
!message->isPersistent());
} else {
- jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ jc->enqueue_data_record(buff, size, size, dtokp.get(),
!message->isPersistent());
}
} else {
if (message->isContentReleased()) {
- jc->enqueue_extern_txn_data_record(size, dtokp.get(),
txn->getXid(), false);
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(),
txn->getXid(), !message->isPersistent());
} else {
- jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), !message->isPersistent());
}
}
} else {
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -377,7 +377,8 @@
store.stage(pmsg);
//append to it
- msg->releaseContent(&store);//ensure that data is not held in memory but
is appended to disk when added
+ msg->setStore(&store);
+ msg->releaseContent(true);//ensure that data is not held in memory but is
appended to disk when added
store.appendContent(cpmsg, data1);
store.appendContent(cpmsg, data2);
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-11 13:35:04 UTC (rev 3622)
@@ -34,16 +34,16 @@
# --- Helper functions ---
- def _browse(self, qn, dt, am, numMsgs, msgSize, txnConsume):
+ def _browse(self, queueName, destinationTag, acquireMode, numMsgs, msgSize,
txnConsume):
txid = None
if txnConsume:
- txid = self._makeXid("consumer-xid-%s" % qn)
+ txid = self._makeXid("consumer-xid-%s" % queueName)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
- self.session.message_flow(destination=dt, unit=self.session.credit_unit.message,
value=0xFFFFFFFF)
- self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte,
value=0xFFFFFFFF)
- queue = self.session.incoming(dt)
+ self.session.message_subscribe(queue=queueName, destination=destinationTag,
acquire_mode=acquireMode)
+ self.session.message_flow(destination=destinationTag,
unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=destinationTag,
unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = self.session.incoming(destinationTag)
ids = RangedSet()
for msgNum in range(0, numMsgs):
expectedStr = self._makeMessage(msgNum, msgSize)
@@ -52,16 +52,16 @@
ids.add(msg.id)
return ids, txid
- def _checkCancel(self, qn, dt, numMsgs, ids):
+ def _checkCancel(self, queueName, destinationTag, numMsgs, ids):
self.session.message_release(ids)
- self.session.queue_declare(queue=qn)
- self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
- self.session.message_cancel(destination=dt)
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(numMsgs,
self.session.queue_query(queue=queueName).message_count)
+ self.session.message_cancel(destination=destinationTag)
- def _checkConsume(self, qn, am, numMsgs, ids, txid, txnConsume):
- if am == self.session.acquire_mode.not_acquired:
- self.session.queue_declare(queue=qn)
- self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
+ def _checkConsume(self, queueName, acquireMode, numMsgs, ids, txid, txnConsume):
+ if acquireMode == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(numMsgs,
self.session.queue_query(queue=queueName).message_count)
response = self.session.message_acquire(ids)
for range in ids:
for msgId in range:
@@ -73,10 +73,15 @@
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
self._resetChannel()
- def _checkEmpty(self, qn):
- self.session.queue_declare(queue=qn)
- self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+ def _checkEmpty(self, queueName):
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(0, self.session.queue_query(queue=queueName).message_count)
+ def _handleSessionClose(self):
+ time.sleep(1)
+ self.tearDown()
+ self.setUp()
+
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -93,19 +98,45 @@
branchqual = "v%s" % self.txCounter
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def _produce(self, qn, dm, numMsgs, msgSize, txnProduce):
+ def _produceDirect(self, queueName, deliveryMode, numMsgs, msgSize, txnProduce):
if txnProduce:
- txid = self._makeXid("producer-xid-%s" % qn)
+ txid = self._makeXid("producer-xid-%s" % queueName)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
for msgNum in range(0, numMsgs):
msg_str = self._makeMessage(msgNum, msgSize)
-
self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn,
delivery_mode=dm), msg_str))
+
self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=queueName,
delivery_mode=deliveryMode), msg_str))
if txnProduce:
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
self._resetChannel()
+
+ def _produceFanout(self, exchangeName, deliveryMode, numMsgs, msgSize, txnProduce,
fail_on_msg):
+ if txnProduce:
+ txid = self._makeXid("producer-xid-%s" % exchangeName)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ try:
+ for msgNum in range(0, numMsgs):
+ msg_str = self._makeMessage(msgNum, msgSize)
+ self.session.message_transfer(destination=exchangeName,
message=Message(self.session.delivery_properties(delivery_mode=deliveryMode), msg_str))
+ if fail_on_msg != None and msgNum > fail_on_msg and not txnProduce:
+ self.fail("Failed to throw SessionException on message_transfer
%s" % fail_on_msg)
+ except SessionException:
+ if fail_on_msg != None and msgNum == fail_on_msg and not txnProduce:
+ self._handleSessionClose()
+ if txnProduce:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ try:
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
+ except SessionException:
+ if fail_on_msg != None and txnProduce:
+ self._handleSessionClose()
+ else:
+ self.fail("Failed to throw expected SessionException on
dtx_commit")
+ self._resetChannel()
def _randomBool(self):
if random.randint(0, 1) > 0:
@@ -116,109 +147,66 @@
self.session.close()
self.session = self.conn.session("test-session", 1)
- # --- Simple tests ---
+ # --- Simple test framework (single queue) ---
- def test_FlowToDisk_00_SimpleMaxCount(self):
- """Flow-to-disk tests based on setting
max_count"""
- self.simpleLimit("test_FlowToDisk_00a", max_count = 10)
- self.simpleLimit("test_FlowToDisk_00b", max_count = 10, persistent =
True)
- self.simpleLimit("test_FlowToDisk_00c", max_count = 10, max_size =
10000000, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_00d", max_count = 10, max_size =
10000000, persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_01_SimpleMaxSize(self):
- """Flow-to-disk tests based on setting max_size"""
- self.simpleLimit("test_FlowToDisk_01a", max_size = 100)
- self.simpleLimit("test_FlowToDisk_01b", max_size = 100, persistent =
True)
- self.simpleLimit("test_FlowToDisk_01c", max_size = 100000, numMsgs =
100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_01d", max_size = 100000, persistent =
True, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_02_SimpleMaxCountNotAcquired(self):
- """Flow-to-disk tests based on setting max_count, but not using
pre-acquired mode (ie browsing)"""
- self.simpleLimit("test_FlowToDisk_02a", max_count = 10, pre_acquired =
False)
- self.simpleLimit("test_FlowToDisk_02b", max_count = 10, persistent =
True, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_02c", max_count = 10, max_size =
10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_02d", max_count = 10, max_size =
10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_03_SimpleMaxSizeNotAcquired(self):
- """Flow-to-disk tests based on setting max_size, but not using
pre-acquired mode (ie browsing)"""
- self.simpleLimit("test_FlowToDisk_03a", max_size = 100, pre_acquired =
False)
- self.simpleLimit("test_FlowToDisk_03b", max_size = 100, persistent =
True, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_03c", max_size = 100, pre_acquired =
False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_03d", max_size = 100, persistent =
True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_04_MaxSizeMaxCount(self):
- """Flow-to-disk tests based on setting both max-count and max-size
at the same time"""
- self.simpleLimit("test_FlowToDisk_04a", max_count = 10, max_size =
1000)
- self.simpleLimit("test_FlowToDisk_04b", max_count = 10, max_size =
1000, msgSize = 250)
- self.simpleLimit("test_FlowToDisk_04c", max_count = 10, max_size =
1000, persistent = True)
- self.simpleLimit("test_FlowToDisk_04d", max_count = 10, max_size =
1000, msgSize = 250, persistent = True)
-
- def test_FlowToDisk_05_Randomized(self):
- """Randomized flow-to-disk tests"""
- seed = long(1000.0 * time.time())
- print "seed=0x%x" % seed
- random.seed(seed)
- for i in range(0, 10):
- self.randomLimit(i)
-
- def simpleLimit(self, qn, max_count = None, max_size = None, persistent = False,
pre_acquired = True, numMsgs = 15, msgSize = None, browse = True):
- qa = {'qpid.policy_type':'flow_to_disk'}
+ def simpleLimit(self, queueName, max_count = None, max_size = None, persistent =
False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy =
"flow_to_disk"):
+ if max_count != None or max_size != None:
+ queueArgs = {'qpid.policy_type':policy}
if max_count != None:
- qa['qpid.max_count'] = max_count
+ queueArgs['qpid.max_count'] = max_count
if max_size != None:
- qa['qpid.max_size'] = max_size
+ queueArgs['qpid.max_size'] = max_size
if persistent:
- dm = self.session.delivery_mode.persistent
+ deliveryMode = self.session.delivery_mode.persistent
else:
- dm = self.session.delivery_mode.non_persistent
+ deliveryMode = self.session.delivery_mode.non_persistent
if pre_acquired:
- am = self.session.acquire_mode.pre_acquired
+ acquireMode = self.session.acquire_mode.pre_acquired
else:
- am = self.session.acquire_mode.not_acquired
+ acquireMode = self.session.acquire_mode.not_acquired
# Cycle through the produce/consume block transaction combinations
for i in range(0, 4):
- tp = i & 1 != 0 # Transactional produce
- tc = i & 2 != 0 # Transactional consume
- self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ txnProduce = i & 1 != 0 # Transactional produce
+ txnConsume = i & 2 != 0 # Transactional consume
+ self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse)
- def randomLimit(self, count):
- qa = {'qpid.policy_type':'flow_to_disk'}
- qn = "randomized_test_%04d" % count
+ def randomLimit(self, count, policy = "flow_to_disk"):
+ queueArgs = {'qpid.policy_type':policy}
+ queueName = "randomized_test_%04d" % count
# Flow to disk policy
maxCount = None
if self._randomBool():
maxCount = random.randint(0,10000)
- qa['qpid.max_count'] = maxCount
+ queueArgs['qpid.max_count'] = maxCount
maxSize = None
if self._randomBool():
maxSize = random.randint(0, 1000000)
- qa['qpid.max_size'] = maxSize
+ queueArgs['qpid.max_size'] = maxSize
# Persistence
if self._randomBool():
- dm = self.session.delivery_mode.persistent
+ deliveryMode = self.session.delivery_mode.persistent
else:
- dm = self.session.delivery_mode.non_persistent
+ deliveryMode = self.session.delivery_mode.non_persistent
# Acquired mode
if self._randomBool():
- am = self.session.acquire_mode.pre_acquired
+ acquireMode = self.session.acquire_mode.pre_acquired
browse = False
else:
- am = self.session.acquire_mode.not_acquired
+ acquireMode = self.session.acquire_mode.not_acquired
browse = self._randomBool()
numMsgs = random.randint(1, 10000)
sizeLimit = int(1000000 / numMsgs)
msgSize = random.randint(1, sizeLimit)
- tp = self._randomBool()
- tc = self._randomBool()
+ txnProduce = self._randomBool()
+ txnConsume = self._randomBool()
- #print " qn=%s, qa=%s, dm=%s, am=%s, numMsgs=%d, msgSize=%d, tp=%s, tc=%s,
browse=%s" % (qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse)
- def txSimpleLimit(self, qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse):
+ def _txSimpleLimit(self, queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count and/or max_size
@@ -227,19 +215,323 @@
* Check the broker has no messages left.
"""
- self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+ self.session.queue_declare(queue=queueName, durable=True, arguments=queueArgs)
- # --- Add messages ---
- self._produce(qn, dm, numMsgs, msgSize, tp)
+ # Add messages
+ self._produceDirect(queueName, deliveryMode, numMsgs, msgSize, txnProduce)
- # --- Browse messages (if possible) ---
- if am == self.session.acquire_mode.not_acquired and browse:
- dtA = "tagA-%d-%d" % (tp, tc)
- ids, txid = self._browse(qn, dtA, am, numMsgs, msgSize, False)
- self._checkCancel(qn, dtA, numMsgs, ids)
+ # Browse messages (if possible)
+ if acquireMode == self.session.acquire_mode.not_acquired and browse:
+ dtA = "tagA-%d-%d" % (txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, dtA, acquireMode, numMsgs, msgSize,
False)
+ self._checkCancel(queueName, dtA, numMsgs, ids)
- # --- Consume messages ---
- dtB = "tagB-%d-%d" % (tp, tc)
- ids, txid = self._browse(qn, dtB, am, numMsgs, msgSize, tc)
- self._checkConsume(qn, am, numMsgs, ids, txid, tc)
- self._checkEmpty(qn)
+ # Consume messages
+ dtB = "tagB-%d-%d" % (txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, dtB, acquireMode, numMsgs, msgSize,
txnConsume)
+ self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
+ self._checkEmpty(queueName)
+
+ self.session.queue_delete(queue=queueName)
+
+ # --- Multi-queue test framework ---
+
+ def multiQueueLimit(self, queueMap, exchangeName = "amq.fanout", persistent
= False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy =
"flow_to_disk", fail_on_msg = None):
+ if persistent:
+ deliveryMode = self.session.delivery_mode.persistent
+ else:
+ deliveryMode = self.session.delivery_mode.non_persistent
+ if pre_acquired:
+ acquireMode = self.session.acquire_mode.pre_acquired
+ else:
+ acquireMode = self.session.acquire_mode.not_acquired
+ # Cycle through the produce/consume block transaction combinations
+# TODO - re-enable full transactional testing when BZ 522499 (which crashes the broker)
is fixed.
+# for i in range(0, 4):
+# txnProduce = i & 1 != 0 # Transactional produce
+# txnConsume = i & 2 != 0 # Transactional consume
+# self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, txnProduce, txnConsume, browse, fail_on_msg)
+ for i in range(0, 2):
+ txnConsume = i & 1 != 0 # Transactional consume
+ self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, False, txnConsume, browse, policy, fail_on_msg)
+
+ def _txMultiQueueLimit(self, queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg):
+ """
+ Test a multi-queue case
+ queueMap = queue map where map is queue name (key) against queue args (value)
+ """
+ self._multiQueueSetup(queueMap, exchangeName, txnProduce, txnConsume, policy)
+ self._produceFanout(exchangeName, deliveryMode, numMsgs, msgSize, txnProduce,
fail_on_msg)
+ if fail_on_msg == None:
+ # Browse messages (ie get without acquiring)
+ if acquireMode == self.session.acquire_mode.not_acquired and browse:
+ for queue in queueMap.iterkeys():
+ deliveryTagA = "tagA-%d-%d" % (txnProduce, txnConsume)
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, deliveryTagA, acquireMode,
numMsgs, msgSize, False)
+ self._checkCancel(queueName, deliveryTagA, numMsgs, ids)
+ # Consume messages
+ for queue in queueMap.iterkeys():
+ deliveryTagB = "tagB-%d-%d" % (txnProduce, txnConsume)
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, deliveryTagB, acquireMode, numMsgs,
msgSize, txnConsume)
+ self._checkConsume(queueName, acquireMode, numMsgs, ids, txid,
txnConsume)
+ self._checkEmpty(queueName)
+ self._multiQueueTeardown(queueMap, exchangeName, txnProduce, txnConsume)
+
+ def _multiQueueSetup(self, queueMap, exchangeName, txnProduce, txnConsume, policy,
exclusive = True):
+ for queue, qp in queueMap.iteritems():
+ queueArgs = {}
+ ftd = False
+ for p,v in qp.iteritems():
+ if p == "persistent":
+ d = v == True
+ elif p == "qpid.max_count":
+ if v != None:
+ queueArgs[p] = v
+ ftd = True
+ elif p == "qpid.max_size":
+ if v != None:
+ queueArgs[p] = v
+ ftd = True
+ if ftd:
+ queueArgs["qpid.policy_type"] = policy
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ self.session.queue_declare(queue = queueName, durable = d, arguments =
queueArgs, exclusive = exclusive)
+ self.session.exchange_bind(exchange = exchangeName, queue = queueName)
+
+ def _multiQueueTeardown(self, queueMap, exchangeName, txnProduce, txnConsume):
+ for queue in queueMap.iterkeys():
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ self.session.exchange_unbind(exchange = exchangeName, queue = queueName)
+ self.session.queue_delete(queue = queueName)
+
+ # --- Simple tests ---
+
+ def test_00_SimpleMaxCount(self):
+ """Flow-to-disk tests based on setting
max_count"""
+ # Small msgs
+ self.simpleLimit("test_00a", max_count = 10)
+ self.simpleLimit("test_00b", max_count = 10, persistent = True)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_00c", max_count = 10, max_size = 10000000,
numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_00d", max_count = 10, max_size = 10000000,
persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_01_SimpleMaxSize(self):
+ """Flow-to-disk tests based on setting max_size"""
+ # Small msgs
+ self.simpleLimit("test_01a", max_size = 100)
+ self.simpleLimit("test_01b", max_size = 100, persistent = True)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_01c", max_size = 100000, numMsgs = 100, msgSize
= 10000)
+ self.simpleLimit("test_01d", max_size = 100000, persistent = True,
numMsgs = 100, msgSize = 10000)
+
+ def test_02_SimpleMaxCountNotAcquired(self):
+ """Flow-to-disk tests based on setting max_count, but not using
pre-acquired mode (ie browsing)"""
+ # Small msgs
+ self.simpleLimit("test_02a", max_count = 10, pre_acquired = False)
+ self.simpleLimit("test_02b", max_count = 10, persistent = True,
pre_acquired = False)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_02c", max_count = 10, max_size = 10000000,
pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_02d", max_count = 10, max_size = 10000000,
persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_03_SimpleMaxSizeNotAcquired(self):
+ """Flow-to-disk tests based on setting max_size, but not using
pre-acquired mode (ie browsing)"""
+ # Small msgs
+ self.simpleLimit("test_03a", max_size = 100, pre_acquired = False)
+ self.simpleLimit("test_03b", max_size = 100, persistent = True,
pre_acquired = False)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_03c", max_size = 100, pre_acquired = False,
numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_03d", max_size = 100, persistent = True,
pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_04_MaxSizeMaxCount(self):
+ """Flow-to-disk tests based on setting both max-count and max-size
at the same time"""
+ # Small msgs
+ self.simpleLimit("test_04a", max_count = 10, max_size = 1000)
+ self.simpleLimit("test_04b", max_count = 10, max_size = 1000, msgSize =
250)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_04c", max_count = 10, max_size = 1000,
persistent = True)
+ self.simpleLimit("test_04d", max_count = 10, max_size = 1000, msgSize =
250, persistent = True)
+
+ # --- Multi-queue tests ---
+
+ def test_05_MultiQueueTransQueueTransMsg(self):
+ """Flow-to-disk tests where both queues and messages are transient
and messages are routed to more than one queue"""
+ queueMap1 = {"test_05a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_05b" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_05c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_05d" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_05e" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_05f" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_05g" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_05h" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_05i" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_05j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_05k" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_05l" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_05m" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_05n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
+
+ def test_06_MultiQueueDurableQueueTransMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_06a" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_06b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_06c" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_06d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_06e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_06f" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_06g" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_06h" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_06i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_06j" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_06k" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_06l" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_06m" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_06n" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
+
+
+ def test_07_MultiQueueTransQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues and messages are transient
and messages are routed to more than one queue"""
+ queueMap1 = {"test_07a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_07b" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_07c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_07d" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_07e" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_07f" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_07g" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_07h" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_07i" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_07j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_07k" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_07l" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_07m" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_07n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True, fail_on_msg =
10)
+ self.multiQueueLimit(queueMap4, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg =
8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True, fail_on_msg =
8)
+
+ def test_08_MultiQueueDurableQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_08a" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_08b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_08c" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_08d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_08e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_08f" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_08g" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_08h" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_08i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_08j" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_08k" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_08l" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_08m" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_08n" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True)
+ self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True)
+ self.multiQueueLimit(queueMap4, persistent = True)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True)
+ self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True)
+
+ def test_09_MultiQueueMixedQueueTransMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_09a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_09c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_09e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09f" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap4 = {"test_09g" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09h" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap5 = {"test_09i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap6 = {"test_09k" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_09l" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap7 = {"test_09m" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_09n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap8 = {"test_09o" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09p" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap9 = {"test_09q" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09r" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap10 = {"test_09s" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09t" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_09u" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09v" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800},
+ "test_09w" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_09x" : {"persistent":True,
"qpid.max_count":12, "qpid.max_size": None},
+ "test_09y" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1100},
+ "test_09z" : {"persistent":True,
"qpid.max_count":7, "qpid.max_size": 900} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap6, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap7, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap8, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap9, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap10, msgSize = 100, fail_on_msg = 7)
+
+ def test_10_MultiQueueMixedQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_10a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_10c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_10e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10f" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
+ queueMap4 = {"test_10g" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10h" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap5 = {"test_10i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap6 = {"test_10k" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_10l" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap7 = {"test_10m" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_10n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
+ queueMap8 = {"test_10o" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10p" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap9 = {"test_10q" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10r" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap10 = {"test_10s" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10t" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
+ "test_10u" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10v" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800},
+ "test_10w" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
+ "test_10x" : {"persistent":True,
"qpid.max_count":12, "qpid.max_size": None},
+ "test_10y" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1100},
+ "test_10z" : {"persistent":True,
"qpid.max_count":7, "qpid.max_size": 900} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, msgSize = 100, persistent = True, fail_on_msg =
10)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg =
10)
+ self.multiQueueLimit(queueMap6, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap7, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap8, msgSize = 100, persistent = True, fail_on_msg =
8)
+ self.multiQueueLimit(queueMap9, msgSize = 100, persistent = True, fail_on_msg =
8)
+ self.multiQueueLimit(queueMap10, msgSize = 100, persistent = True, fail_on_msg =
7)
+
+ # --- Long and randomized tests ---
+
+ def test_11_Randomized(self):
+ """Randomized flow-to-disk tests"""
+ seed = long(1000.0 * time.time())
+ print "seed=0x%x" % seed
+ random.seed(seed)
+ for i in range(0, 10):
+ self.randomLimit(i)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/run_python_tests 2009-09-11 13:35:04 UTC (rev 3622)
@@ -78,10 +78,12 @@
start_broker() {
${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB}
--data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file
${STORE_DIR}/broker.python-test.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
+ echo "run_python_tests: Started qpidd on port ${LOCAL_PORT}"
}
stop_broker() {
- ${QPIDD} -q --port $LOCAL_PORT
+ echo "run_python_tests: Stopping broker on port ${LOCAL_PORT}"
+ ${QPIDD} -q --port ${LOCAL_PORT}
}
fail=0