Author: kpvdr
Date: 2009-09-14 11:21:33 -0400 (Mon, 14 Sep 2009)
New Revision: 3623
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
Reversed checkin 3622/813825 until its problems can be resolved.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -252,8 +252,6 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
-// Return true if content is recovered from store; false if content is external and must
be recovered from an external store.
-// Throw exception for all errors.
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t
offset)
{
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -1474,15 +1474,15 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
if (message->isContentReleased()) {
- jc->enqueue_extern_data_record(size, dtokp.get(),
!message->isPersistent());
+ jc->enqueue_extern_data_record(size, dtokp.get(), false);
} else {
- jc->enqueue_data_record(buff, size, size, dtokp.get(),
!message->isPersistent());
+ jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
} else {
if (message->isContentReleased()) {
- jc->enqueue_extern_txn_data_record(size, dtokp.get(),
txn->getXid(), !message->isPersistent());
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(),
txn->getXid(), false);
} else {
- jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), !message->isPersistent());
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
}
}
} else {
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -377,8 +377,7 @@
store.stage(pmsg);
//append to it
- msg->setStore(&store);
- msg->releaseContent(true);//ensure that data is not held in memory but is
appended to disk when added
+ msg->releaseContent(&store);//ensure that data is not held in memory but
is appended to disk when added
store.appendContent(cpmsg, data1);
store.appendContent(cpmsg, data2);
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-14 15:21:33 UTC (rev 3623)
@@ -34,16 +34,16 @@
# --- Helper functions ---
- def _browse(self, queueName, destinationTag, acquireMode, numMsgs, msgSize,
txnConsume):
+ def _browse(self, qn, dt, am, numMsgs, msgSize, txnConsume):
txid = None
if txnConsume:
- txid = self._makeXid("consumer-xid-%s" % queueName)
+ txid = self._makeXid("consumer-xid-%s" % qn)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- self.session.message_subscribe(queue=queueName, destination=destinationTag,
acquire_mode=acquireMode)
- self.session.message_flow(destination=destinationTag,
unit=self.session.credit_unit.message, value=0xFFFFFFFF)
- self.session.message_flow(destination=destinationTag,
unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
- queue = self.session.incoming(destinationTag)
+ self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.message,
value=0xFFFFFFFF)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte,
value=0xFFFFFFFF)
+ queue = self.session.incoming(dt)
ids = RangedSet()
for msgNum in range(0, numMsgs):
expectedStr = self._makeMessage(msgNum, msgSize)
@@ -52,16 +52,16 @@
ids.add(msg.id)
return ids, txid
- def _checkCancel(self, queueName, destinationTag, numMsgs, ids):
+ def _checkCancel(self, qn, dt, numMsgs, ids):
self.session.message_release(ids)
- self.session.queue_declare(queue=queueName)
- self.assertEqual(numMsgs,
self.session.queue_query(queue=queueName).message_count)
- self.session.message_cancel(destination=destinationTag)
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
+ self.session.message_cancel(destination=dt)
- def _checkConsume(self, queueName, acquireMode, numMsgs, ids, txid, txnConsume):
- if acquireMode == self.session.acquire_mode.not_acquired:
- self.session.queue_declare(queue=queueName)
- self.assertEqual(numMsgs,
self.session.queue_query(queue=queueName).message_count)
+ def _checkConsume(self, qn, am, numMsgs, ids, txid, txnConsume):
+ if am == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
response = self.session.message_acquire(ids)
for range in ids:
for msgId in range:
@@ -73,15 +73,10 @@
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
self._resetChannel()
- def _checkEmpty(self, queueName):
- self.session.queue_declare(queue=queueName)
- self.assertEqual(0, self.session.queue_query(queue=queueName).message_count)
+ def _checkEmpty(self, qn):
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
- def _handleSessionClose(self):
- time.sleep(1)
- self.tearDown()
- self.setUp()
-
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -98,45 +93,19 @@
branchqual = "v%s" % self.txCounter
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def _produceDirect(self, queueName, deliveryMode, numMsgs, msgSize, txnProduce):
+ def _produce(self, qn, dm, numMsgs, msgSize, txnProduce):
if txnProduce:
- txid = self._makeXid("producer-xid-%s" % queueName)
+ txid = self._makeXid("producer-xid-%s" % qn)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
for msgNum in range(0, numMsgs):
msg_str = self._makeMessage(msgNum, msgSize)
-
self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=queueName,
delivery_mode=deliveryMode), msg_str))
+
self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn,
delivery_mode=dm), msg_str))
if txnProduce:
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
self._resetChannel()
-
- def _produceFanout(self, exchangeName, deliveryMode, numMsgs, msgSize, txnProduce,
fail_on_msg):
- if txnProduce:
- txid = self._makeXid("producer-xid-%s" % exchangeName)
- self.session.dtx_select()
- self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- try:
- for msgNum in range(0, numMsgs):
- msg_str = self._makeMessage(msgNum, msgSize)
- self.session.message_transfer(destination=exchangeName,
message=Message(self.session.delivery_properties(delivery_mode=deliveryMode), msg_str))
- if fail_on_msg != None and msgNum > fail_on_msg and not txnProduce:
- self.fail("Failed to throw SessionException on message_transfer
%s" % fail_on_msg)
- except SessionException:
- if fail_on_msg != None and msgNum == fail_on_msg and not txnProduce:
- self._handleSessionClose()
- if txnProduce:
- self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
- self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
- try:
- self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid,
one_phase=False).status)
- except SessionException:
- if fail_on_msg != None and txnProduce:
- self._handleSessionClose()
- else:
- self.fail("Failed to throw expected SessionException on
dtx_commit")
- self._resetChannel()
def _randomBool(self):
if random.randint(0, 1) > 0:
@@ -147,66 +116,109 @@
self.session.close()
self.session = self.conn.session("test-session", 1)
- # --- Simple test framework (single queue) ---
+ # --- Simple tests ---
- def simpleLimit(self, queueName, max_count = None, max_size = None, persistent =
False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy =
"flow_to_disk"):
- if max_count != None or max_size != None:
- queueArgs = {'qpid.policy_type':policy}
+ def test_FlowToDisk_00_SimpleMaxCount(self):
+ """Flow-to-disk tests based on setting
max_count"""
+ self.simpleLimit("test_FlowToDisk_00a", max_count = 10)
+ self.simpleLimit("test_FlowToDisk_00b", max_count = 10, persistent =
True)
+ self.simpleLimit("test_FlowToDisk_00c", max_count = 10, max_size =
10000000, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_00d", max_count = 10, max_size =
10000000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_01_SimpleMaxSize(self):
+ """Flow-to-disk tests based on setting max_size"""
+ self.simpleLimit("test_FlowToDisk_01a", max_size = 100)
+ self.simpleLimit("test_FlowToDisk_01b", max_size = 100, persistent =
True)
+ self.simpleLimit("test_FlowToDisk_01c", max_size = 100000, numMsgs =
100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_01d", max_size = 100000, persistent =
True, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_02_SimpleMaxCountNotAcquired(self):
+ """Flow-to-disk tests based on setting max_count, but not using
pre-acquired mode (ie browsing)"""
+ self.simpleLimit("test_FlowToDisk_02a", max_count = 10, pre_acquired =
False)
+ self.simpleLimit("test_FlowToDisk_02b", max_count = 10, persistent =
True, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_02c", max_count = 10, max_size =
10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_02d", max_count = 10, max_size =
10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_03_SimpleMaxSizeNotAcquired(self):
+ """Flow-to-disk tests based on setting max_size, but not using
pre-acquired mode (ie browsing)"""
+ self.simpleLimit("test_FlowToDisk_03a", max_size = 100, pre_acquired =
False)
+ self.simpleLimit("test_FlowToDisk_03b", max_size = 100, persistent =
True, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_03c", max_size = 100, pre_acquired =
False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_03d", max_size = 100, persistent =
True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_04_MaxSizeMaxCount(self):
+ """Flow-to-disk tests based on setting both max-count and max-size
at the same time"""
+ self.simpleLimit("test_FlowToDisk_04a", max_count = 10, max_size =
1000)
+ self.simpleLimit("test_FlowToDisk_04b", max_count = 10, max_size =
1000, msgSize = 250)
+ self.simpleLimit("test_FlowToDisk_04c", max_count = 10, max_size =
1000, persistent = True)
+ self.simpleLimit("test_FlowToDisk_04d", max_count = 10, max_size =
1000, msgSize = 250, persistent = True)
+
+ def test_FlowToDisk_05_Randomized(self):
+ """Randomized flow-to-disk tests"""
+ seed = long(1000.0 * time.time())
+ print "seed=0x%x" % seed
+ random.seed(seed)
+ for i in range(0, 10):
+ self.randomLimit(i)
+
+ def simpleLimit(self, qn, max_count = None, max_size = None, persistent = False,
pre_acquired = True, numMsgs = 15, msgSize = None, browse = True):
+ qa = {'qpid.policy_type':'flow_to_disk'}
if max_count != None:
- queueArgs['qpid.max_count'] = max_count
+ qa['qpid.max_count'] = max_count
if max_size != None:
- queueArgs['qpid.max_size'] = max_size
+ qa['qpid.max_size'] = max_size
if persistent:
- deliveryMode = self.session.delivery_mode.persistent
+ dm = self.session.delivery_mode.persistent
else:
- deliveryMode = self.session.delivery_mode.non_persistent
+ dm = self.session.delivery_mode.non_persistent
if pre_acquired:
- acquireMode = self.session.acquire_mode.pre_acquired
+ am = self.session.acquire_mode.pre_acquired
else:
- acquireMode = self.session.acquire_mode.not_acquired
+ am = self.session.acquire_mode.not_acquired
# Cycle through the produce/consume block transaction combinations
for i in range(0, 4):
- txnProduce = i & 1 != 0 # Transactional produce
- txnConsume = i & 2 != 0 # Transactional consume
- self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse)
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- def randomLimit(self, count, policy = "flow_to_disk"):
- queueArgs = {'qpid.policy_type':policy}
- queueName = "randomized_test_%04d" % count
+ def randomLimit(self, count):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ qn = "randomized_test_%04d" % count
# Flow to disk policy
maxCount = None
if self._randomBool():
maxCount = random.randint(0,10000)
- queueArgs['qpid.max_count'] = maxCount
+ qa['qpid.max_count'] = maxCount
maxSize = None
if self._randomBool():
maxSize = random.randint(0, 1000000)
- queueArgs['qpid.max_size'] = maxSize
+ qa['qpid.max_size'] = maxSize
# Persistence
if self._randomBool():
- deliveryMode = self.session.delivery_mode.persistent
+ dm = self.session.delivery_mode.persistent
else:
- deliveryMode = self.session.delivery_mode.non_persistent
+ dm = self.session.delivery_mode.non_persistent
# Acquired mode
if self._randomBool():
- acquireMode = self.session.acquire_mode.pre_acquired
+ am = self.session.acquire_mode.pre_acquired
browse = False
else:
- acquireMode = self.session.acquire_mode.not_acquired
+ am = self.session.acquire_mode.not_acquired
browse = self._randomBool()
numMsgs = random.randint(1, 10000)
sizeLimit = int(1000000 / numMsgs)
msgSize = random.randint(1, sizeLimit)
- txnProduce = self._randomBool()
- txnConsume = self._randomBool()
+ tp = self._randomBool()
+ tc = self._randomBool()
- self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse)
+ #print " qn=%s, qa=%s, dm=%s, am=%s, numMsgs=%d, msgSize=%d, tp=%s, tc=%s,
browse=%s" % (qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- def _txSimpleLimit(self, queueName, queueArgs, deliveryMode, acquireMode, numMsgs,
msgSize, txnProduce, txnConsume, browse):
+ def txSimpleLimit(self, qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count and/or max_size
@@ -215,323 +227,19 @@
* Check the broker has no messages left.
"""
- self.session.queue_declare(queue=queueName, durable=True, arguments=queueArgs)
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
- # Add messages
- self._produceDirect(queueName, deliveryMode, numMsgs, msgSize, txnProduce)
+ # --- Add messages ---
+ self._produce(qn, dm, numMsgs, msgSize, tp)
- # Browse messages (if possible)
- if acquireMode == self.session.acquire_mode.not_acquired and browse:
- dtA = "tagA-%d-%d" % (txnProduce, txnConsume)
- ids, txid = self._browse(queueName, dtA, acquireMode, numMsgs, msgSize,
False)
- self._checkCancel(queueName, dtA, numMsgs, ids)
+ # --- Browse messages (if possible) ---
+ if am == self.session.acquire_mode.not_acquired and browse:
+ dtA = "tagA-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtA, am, numMsgs, msgSize, False)
+ self._checkCancel(qn, dtA, numMsgs, ids)
- # Consume messages
- dtB = "tagB-%d-%d" % (txnProduce, txnConsume)
- ids, txid = self._browse(queueName, dtB, acquireMode, numMsgs, msgSize,
txnConsume)
- self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
- self._checkEmpty(queueName)
-
- self.session.queue_delete(queue=queueName)
-
- # --- Multi-queue test framework ---
-
- def multiQueueLimit(self, queueMap, exchangeName = "amq.fanout", persistent
= False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy =
"flow_to_disk", fail_on_msg = None):
- if persistent:
- deliveryMode = self.session.delivery_mode.persistent
- else:
- deliveryMode = self.session.delivery_mode.non_persistent
- if pre_acquired:
- acquireMode = self.session.acquire_mode.pre_acquired
- else:
- acquireMode = self.session.acquire_mode.not_acquired
- # Cycle through the produce/consume block transaction combinations
-# TODO - re-enable full transactional testing when BZ 522499 (which crashes the broker)
is fixed.
-# for i in range(0, 4):
-# txnProduce = i & 1 != 0 # Transactional produce
-# txnConsume = i & 2 != 0 # Transactional consume
-# self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, txnProduce, txnConsume, browse, fail_on_msg)
- for i in range(0, 2):
- txnConsume = i & 1 != 0 # Transactional consume
- self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, False, txnConsume, browse, policy, fail_on_msg)
-
- def _txMultiQueueLimit(self, queueMap, exchangeName, deliveryMode, acquireMode,
numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg):
- """
- Test a multi-queue case
- queueMap = queue map where map is queue name (key) against queue args (value)
- """
- self._multiQueueSetup(queueMap, exchangeName, txnProduce, txnConsume, policy)
- self._produceFanout(exchangeName, deliveryMode, numMsgs, msgSize, txnProduce,
fail_on_msg)
- if fail_on_msg == None:
- # Browse messages (ie get without acquiring)
- if acquireMode == self.session.acquire_mode.not_acquired and browse:
- for queue in queueMap.iterkeys():
- deliveryTagA = "tagA-%d-%d" % (txnProduce, txnConsume)
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- ids, txid = self._browse(queueName, deliveryTagA, acquireMode,
numMsgs, msgSize, False)
- self._checkCancel(queueName, deliveryTagA, numMsgs, ids)
- # Consume messages
- for queue in queueMap.iterkeys():
- deliveryTagB = "tagB-%d-%d" % (txnProduce, txnConsume)
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- ids, txid = self._browse(queueName, deliveryTagB, acquireMode, numMsgs,
msgSize, txnConsume)
- self._checkConsume(queueName, acquireMode, numMsgs, ids, txid,
txnConsume)
- self._checkEmpty(queueName)
- self._multiQueueTeardown(queueMap, exchangeName, txnProduce, txnConsume)
-
- def _multiQueueSetup(self, queueMap, exchangeName, txnProduce, txnConsume, policy,
exclusive = True):
- for queue, qp in queueMap.iteritems():
- queueArgs = {}
- ftd = False
- for p,v in qp.iteritems():
- if p == "persistent":
- d = v == True
- elif p == "qpid.max_count":
- if v != None:
- queueArgs[p] = v
- ftd = True
- elif p == "qpid.max_size":
- if v != None:
- queueArgs[p] = v
- ftd = True
- if ftd:
- queueArgs["qpid.policy_type"] = policy
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- self.session.queue_declare(queue = queueName, durable = d, arguments =
queueArgs, exclusive = exclusive)
- self.session.exchange_bind(exchange = exchangeName, queue = queueName)
-
- def _multiQueueTeardown(self, queueMap, exchangeName, txnProduce, txnConsume):
- for queue in queueMap.iterkeys():
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- self.session.exchange_unbind(exchange = exchangeName, queue = queueName)
- self.session.queue_delete(queue = queueName)
-
- # --- Simple tests ---
-
- def test_00_SimpleMaxCount(self):
- """Flow-to-disk tests based on setting
max_count"""
- # Small msgs
- self.simpleLimit("test_00a", max_count = 10)
- self.simpleLimit("test_00b", max_count = 10, persistent = True)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_00c", max_count = 10, max_size = 10000000,
numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_00d", max_count = 10, max_size = 10000000,
persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_01_SimpleMaxSize(self):
- """Flow-to-disk tests based on setting max_size"""
- # Small msgs
- self.simpleLimit("test_01a", max_size = 100)
- self.simpleLimit("test_01b", max_size = 100, persistent = True)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_01c", max_size = 100000, numMsgs = 100, msgSize
= 10000)
- self.simpleLimit("test_01d", max_size = 100000, persistent = True,
numMsgs = 100, msgSize = 10000)
-
- def test_02_SimpleMaxCountNotAcquired(self):
- """Flow-to-disk tests based on setting max_count, but not using
pre-acquired mode (ie browsing)"""
- # Small msgs
- self.simpleLimit("test_02a", max_count = 10, pre_acquired = False)
- self.simpleLimit("test_02b", max_count = 10, persistent = True,
pre_acquired = False)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_02c", max_count = 10, max_size = 10000000,
pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_02d", max_count = 10, max_size = 10000000,
persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_03_SimpleMaxSizeNotAcquired(self):
- """Flow-to-disk tests based on setting max_size, but not using
pre-acquired mode (ie browsing)"""
- # Small msgs
- self.simpleLimit("test_03a", max_size = 100, pre_acquired = False)
- self.simpleLimit("test_03b", max_size = 100, persistent = True,
pre_acquired = False)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_03c", max_size = 100, pre_acquired = False,
numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_03d", max_size = 100, persistent = True,
pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_04_MaxSizeMaxCount(self):
- """Flow-to-disk tests based on setting both max-count and max-size
at the same time"""
- # Small msgs
- self.simpleLimit("test_04a", max_count = 10, max_size = 1000)
- self.simpleLimit("test_04b", max_count = 10, max_size = 1000, msgSize =
250)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_04c", max_count = 10, max_size = 1000,
persistent = True)
- self.simpleLimit("test_04d", max_count = 10, max_size = 1000, msgSize =
250, persistent = True)
-
- # --- Multi-queue tests ---
-
- def test_05_MultiQueueTransQueueTransMsg(self):
- """Flow-to-disk tests where both queues and messages are transient
and messages are routed to more than one queue"""
- queueMap1 = {"test_05a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_05b" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_05c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_05d" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_05e" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_05f" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_05g" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_05h" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_05i" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_05j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_05k" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_05l" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_05m" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_05n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
-
- def test_06_MultiQueueDurableQueueTransMsg(self):
- """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_06a" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_06b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_06c" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_06d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_06e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_06f" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_06g" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_06h" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_06i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_06j" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_06k" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_06l" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_06m" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_06n" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
-
-
- def test_07_MultiQueueTransQueueDurableMsg(self):
- """Flow-to-disk tests where both queues and messages are transient
and messages are routed to more than one queue"""
- queueMap1 = {"test_07a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_07b" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_07c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_07d" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_07e" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_07f" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_07g" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_07h" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_07i" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_07j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_07k" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_07l" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_07m" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_07n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True, fail_on_msg =
10)
- self.multiQueueLimit(queueMap4, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg =
8)
- self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True, fail_on_msg =
8)
-
- def test_08_MultiQueueDurableQueueDurableMsg(self):
- """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_08a" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_08b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_08c" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_08d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_08e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_08f" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_08g" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_08h" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_08i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_08j" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_08k" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_08l" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_08m" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_08n" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True)
- self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True)
- self.multiQueueLimit(queueMap4, persistent = True)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True)
- self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True)
-
- def test_09_MultiQueueMixedQueueTransMsg(self):
- """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_09a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_09c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_09e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09f" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap4 = {"test_09g" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09h" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap5 = {"test_09i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap6 = {"test_09k" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_09l" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap7 = {"test_09m" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_09n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap8 = {"test_09o" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_09p" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap9 = {"test_09q" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_09r" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap10 = {"test_09s" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09t" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_09u" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_09v" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800},
- "test_09w" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_09x" : {"persistent":True,
"qpid.max_count":12, "qpid.max_size": None},
- "test_09y" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1100},
- "test_09z" : {"persistent":True,
"qpid.max_count":7, "qpid.max_size": 900} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap6, fail_on_msg = 8)
- self.multiQueueLimit(queueMap7, fail_on_msg = 8)
- self.multiQueueLimit(queueMap8, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap9, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap10, msgSize = 100, fail_on_msg = 7)
-
- def test_10_MultiQueueMixedQueueDurableMsg(self):
- """Flow-to-disk tests where both queues are durable but messages
are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_10a" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10b" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_10c" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10d" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_10e" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10f" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None} }
- queueMap4 = {"test_10g" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10h" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap5 = {"test_10i" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10j" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap6 = {"test_10k" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_10l" : {"persistent":True,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap7 = {"test_10m" : {"persistent":True,
"qpid.max_count":10, "qpid.max_size": None},
- "test_10n" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": None} }
- queueMap8 = {"test_10o" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_10p" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap9 = {"test_10q" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_10r" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 800} }
- queueMap10 = {"test_10s" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10t" : {"persistent":False,
"qpid.max_count":10, "qpid.max_size": None},
- "test_10u" : {"persistent":False,
"qpid.max_count":None, "qpid.max_size": 1000},
- "test_10v" : {"persistent":False,
"qpid.max_count":8, "qpid.max_size": 800},
- "test_10w" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": None},
- "test_10x" : {"persistent":True,
"qpid.max_count":12, "qpid.max_size": None},
- "test_10y" : {"persistent":True,
"qpid.max_count":None, "qpid.max_size": 1100},
- "test_10z" : {"persistent":True,
"qpid.max_count":7, "qpid.max_size": 900} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, msgSize = 100, persistent = True, fail_on_msg =
10)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg =
10)
- self.multiQueueLimit(queueMap6, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap7, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap8, msgSize = 100, persistent = True, fail_on_msg =
8)
- self.multiQueueLimit(queueMap9, msgSize = 100, persistent = True, fail_on_msg =
8)
- self.multiQueueLimit(queueMap10, msgSize = 100, persistent = True, fail_on_msg =
7)
-
- # --- Long and randomized tests ---
-
- def test_11_Randomized(self):
- """Randomized flow-to-disk tests"""
- seed = long(1000.0 * time.time())
- print "seed=0x%x" % seed
- random.seed(seed)
- for i in range(0, 10):
- self.randomLimit(i)
+ # --- Consume messages ---
+ dtB = "tagB-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtB, am, numMsgs, msgSize, tc)
+ self._checkConsume(qn, am, numMsgs, ids, txid, tc)
+ self._checkEmpty(qn)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/run_python_tests 2009-09-14 15:21:33 UTC (rev 3623)
@@ -78,12 +78,10 @@
start_broker() {
${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB}
--data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file
${STORE_DIR}/broker.python-test.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
- echo "run_python_tests: Started qpidd on port ${LOCAL_PORT}"
}
stop_broker() {
- echo "run_python_tests: Stopping broker on port ${LOCAL_PORT}"
- ${QPIDD} -q --port ${LOCAL_PORT}
+ ${QPIDD} -q --port $LOCAL_PORT
}
fail=0