Author: kpvdr
Date: 2009-06-11 14:53:52 -0400 (Thu, 11 Jun 2009)
New Revision: 3451
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/cluster/run_cluster_tests
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for BZ505274 - "Large durable messages that 'flow to disk', are not
recovered correctly".
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-06-11 18:53:52 UTC (rev 3451)
@@ -253,7 +253,7 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length)
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t
offset)
{
if (_dtok.rid() != rid)
{
@@ -321,13 +321,14 @@
throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(),
"JournalImpl", "loadMsgContent");
}
}
- if (_external)
- return false;
- u_int32_t offset = qpid::framing::Buffer(static_cast<char*>(_datap),
sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
- if (offset + length > _dlen) {
- data.append((const char*)_datap + offset, _dlen - offset);
+
+ if (_external) return false;
+
+ u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap),
sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
+ if (hdr_offs + offset + length > _dlen) {
+ data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
} else {
- data.append((const char*)_datap + offset, length);
+ data.append((const char*)_datap + hdr_offs + offset, length);
}
return true;
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.h 2009-06-11 18:53:52 UTC (rev 3451)
@@ -151,7 +151,7 @@
// Temporary fn to read and save last msg read from journal so it can be
assigned
// in chunks. To be replaced when coding to do this direct from the journal
is ready.
// Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t length);
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length,
size_t offset = 0);
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-06-11 18:53:52 UTC (rev 3451)
@@ -1335,7 +1335,7 @@
try {
JournalImpl* jc =
static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ) {
- if (jc->loadMsgContent(messageId, data, length)) {
+ if (jc->loadMsgContent(messageId, data, length, offset)) {
return;
}
}
Modified: store/trunk/cpp/tests/cluster/run_cluster_tests
===================================================================
--- store/trunk/cpp/tests/cluster/run_cluster_tests 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/cluster/run_cluster_tests 2009-06-11 18:53:52 UTC (rev 3451)
@@ -61,7 +61,7 @@
export SENDER_EXEC=${QPID_DIR}/cpp/src/tests/sender
else
# Path from known installed locations
- CLUSTER_PATH=/usr/libexec/qpid/tests/cluster_test
+ CLUSTER_PATH=/usr/libexec/qpid/tests/${CPP_CLUSTER_EXEC}
if test -z ${CLUSTER_PATH} ; then
echo "No executable \"${CPP_CLUSTER_EXEC}\" found in path"
exit 1
@@ -73,8 +73,8 @@
export CLUSTER_LIB=/usr/lib/qpid/daemon/cluster.so
export QPID_CONFIG_EXEC=/usr/bin/qpid-config
export QPID_ROUTE_EXEC=/usr/bin/qpid-route
- export RECEIVER_EXEC=/usr/libexec/qpid/test/receiver
- export SENDER_EXEC=/usr/libexec/qpid/test/sender
+ export RECEIVER_EXEC=/usr/libexec/qpid/tests/receiver
+ export SENDER_EXEC=/usr/libexec/qpid/tests/sender
fi
export STORE_LIB=${abs_srcdir}/../../lib/.libs/msgstore.so
@@ -99,7 +99,7 @@
Unable to load python qpid module - skipping python tests.
- PYTHONPATH=${PYTHONPATH}"
+ PYTHONPATH=${PYTHONPATH}
===========================================================
@@ -114,8 +114,8 @@
mkdir -p ${TMP_STORE_DIR}/cluster
else
# Delete old cluster test dirs
- rm -rf "${TMP_STORE_DIR}/cluster"
- mkdir -p "${TMP_STORE_DIR}/cluster"
+ rm -rf ${TMP_STORE_DIR}/cluster
+ mkdir -p ${TMP_STORE_DIR}/cluster
fi
export TMP_STORE_DIR
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 18:53:52 UTC (rev 3451)
@@ -28,12 +28,23 @@
class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
+ def _makeMessage(self, msgCnt, msgSize):
+ msg = "Message-%04d" % msgCnt
+ msgLen = len(msg)
+ if msgSize != None and msgSize > msgLen:
+ for i in range(msgLen, msgSize):
+ if i == msgLen:
+ msg += "-"
+ else:
+ msg += chr(ord('a') + (i % 26))
+ return msg
+
def test_FlowToDisk_01_SimpleMaxCountTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.pre_acquired)
def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.pre_acquired)
def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
@@ -44,23 +55,55 @@
queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.pre_acquired)
- def test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
-
self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.pre_acquired, 100, 10000)
- def test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
-
self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg",
queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired,
100, 10000)
- def test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired(self):
+ def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
-
self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired)
+ self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.pre_acquired, 100, 10000)
- def test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired(self):
+ def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
-
self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.not_acquired)
+ self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg",
queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired,
100, 10000)
- def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
+ def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired",
queue_args, self.session.delivery_mode.persistent,
self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg",
queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired,
100, 10000)
+
+ def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg",
queue_args, self.session.delivery_mode.non_persistent,
self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg",
queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired,
100, 10000)
+
+ def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs
= 15, msg_size = None):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count 10 or max_size 100
@@ -73,8 +116,8 @@
session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
# Add 15 messages
- for msg_num in range(0, 15):
- msg_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name,
delivery_mode=delivery_mode), msg_str))
# Consume/browse 15 messages
@@ -83,8 +126,8 @@
session.message_flow(destination="tag", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
queue = session.incoming("tag")
ids = RangedSet()
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
ids.add(msg.id)
@@ -92,36 +135,53 @@
# If not_acquired, chek messages are still on queue, then acquire/accept
if acquire_mode == self.session.acquire_mode.not_acquired:
session.queue_declare(queue=queue_name)
- self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ self.assertEqual(num_msgs,
session.queue_query(queue=queue_name).message_count)
response = session.message_acquire(ids)
for range_ in ids:
for msg_id in range_:
self.assert_(msg_id in response.transfers)
- session.message_accept(ids)
+
+ session.message_accept(ids)
# Check queue is empty
session.queue_declare(queue=queue_name)
self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
- def test_FlowToDisk_09_MaxCountBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
-
self.not_acquired_browse_consume_limit("test_FlowToDisk_09_MaxCountBrowseConsumeTransient",
queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient",
queue_args, self.session.delivery_mode.non_persistent)
- def test_FlowToDisk_10_MaxCountBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10}
-
self.not_acquired_browse_consume_limit("test_FlowToDisk_10_MaxCountBrowseConsumePersistent",
queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent",
queue_args, self.session.delivery_mode.persistent)
- def test_FlowToDisk_11_MaxSizeBrowseConsumeTransient(self):
+ def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
-
self.not_acquired_browse_consume_limit("test_FlowToDisk_11_MaxSizeBrowseConsumeTransient",
queue_args, self.session.delivery_mode.non_persistent)
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient",
queue_args, self.session.delivery_mode.non_persistent)
- def test_FlowToDisk_12_MaxSizeBrowseConsumePersistent(self):
+ def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
-
self.not_acquired_browse_consume_limit("test_FlowToDisk_12_MaxSizeBrowseConsumePersistent",
queue_args, self.session.delivery_mode.persistent)
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent",
queue_args, self.session.delivery_mode.persistent)
+
+ def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg",
queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+ def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_count': 10, 'qpid.max_size': 100000000}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg",
queue_args, self.session.delivery_mode.persistent, 100, 10000)
+
+ def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg",
queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+ def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk',
'qpid.max_size': 100}
+
self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg",
queue_args, self.session.delivery_mode.persistent, 100, 10000)
- def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+ def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode,
num_msgs = 15, msg_size = None):
"""
Test to check browsing then subsequent consumption of flow-to-disk messages.
* 15 messages of size 10 are added. The last five will flow to disk.
@@ -135,8 +195,8 @@
session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
# Add 15 messages
- for msg_num in range(0, 15):
- msg_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name,
delivery_mode=delivery_mode), msg_str))
# Browse 15 messages
@@ -145,8 +205,8 @@
session.message_flow(destination="tagA", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
queue = session.incoming("tagA")
ids = RangedSet()
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
ids.add(msg.id)
@@ -154,7 +214,7 @@
# Release all 15 messages and close
session.message_release(ids)
session.queue_declare(queue=queue_name)
- self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
# Cancel subscription, start new one that consumes
session.message_cancel(destination="tagA")
@@ -162,10 +222,13 @@
session.message_flow(destination="tagB",
unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tagB", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
queue = session.incoming("tagB")
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ ids = RangedSet()
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ session.message_accept(ids)
# Check queue is empty
session.queue_declare(queue=queue_name)