[rhmessaging-commits] rhmessaging commits: r2137 - in store/trunk/cpp: tests and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Jun 5 18:07:06 EDT 2008
Author: kpvdr
Date: 2008-06-05 18:07:06 -0400 (Thu, 05 Jun 2008)
New Revision: 2137
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/tests/failing_python_tests.txt
store/trunk/cpp/tests/python_tests/
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for 448935: "Lazy-load with journal will fail for redelivered messages". Added additional flow-to-disk tests that test this mode of failure.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-06-05 22:07:06 UTC (rev 2137)
@@ -53,6 +53,7 @@
const qpid::sys::Duration flushTimeout):
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
+ lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
_xidp(0),
@@ -101,14 +102,7 @@
}
(dynamic_cast<GetEventsFireEvent*>(getEventsFireEventsPtr.get()))->cancel();
(dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
- if (_xidp) {
- ::free(_xidp);
- _xidp = 0;
- _datap = 0;
- } else if (_datap) {
- ::free(_datap);
- _datap = 0;
- }
+ free_read_buffers();
// TODO: Make this if() thread-safe
if (journalTimerPtr && --cnt == 0)
@@ -229,14 +223,9 @@
if (_dtok.rid() != rid)
{
// Free any previous msg
- if (_xidp) {
- ::free(_xidp);
- _xidp = 0;
- _datap = 0;
- } else if (_datap) {
- ::free(_datap);
- _datap = 0;
- }
+ free_read_buffers();
+ if (rid < lastReadRid)
+ _rmgr.invalidate();
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -249,30 +238,36 @@
unsigned aio_sleep_cnt = 0;
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
- bool rid_low = _dtok.rid() < rid;
- rid_found = _dtok.rid() == rid;
- if (res == journal::RHM_IORES_SUCCESS && !rid_low) {
- done = true;
- } else if (res == journal::RHM_IORES_PAGE_AIOWAIT) {
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
- get_wr_events();
- usleep(AIO_SLEEP_TIME);
- } else {
+ switch (res) {
+ case journal::RHM_IORES_SUCCESS:
+ if (_dtok.rid() < rid) {
+ free_read_buffers();
+ // reset data token for next read
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_wstate(DataTokenImpl::ENQ);
+ _dtok.set_rid(0);
+ } else {
+ rid_found = _dtok.rid() == rid;
+ lastReadRid = rid;
+ done = true;
+ }
+ break;
+ case journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+ get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " << journal::iores_str(res);
+ ss << "; exceeded maximum wait time";
+ throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ }
+ break;
+ default:
std::stringstream ss;
ss << "read_data_record() returned " << journal::iores_str(res);
- ss << "; exceeded maximum wait time";
throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
- }
- } else if (rid_low) {
- // reset data token for next read
- _dlen = 0;
- _dtok.reset();
- _dtok.set_wstate(DataTokenImpl::ENQ);
- _dtok.set_rid(0);
- } else {
- std::stringstream ss;
- ss << "read_data_record() returned " << journal::iores_str(res);
- throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
if (!rid_found) {
@@ -447,6 +442,19 @@
}
void
+JournalImpl::free_read_buffers()
+{
+ if (_xidp) {
+ ::free(_xidp);
+ _xidp = 0;
+ _datap = 0;
+ } else if (_datap) {
+ ::free(_datap);
+ _datap = 0;
+ }
+}
+
+void
JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-06-05 22:07:06 UTC (rev 2137)
@@ -74,6 +74,8 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+
+ u_int64_t lastReadRid; // rid of last read msg
bool writeActivityFlag;
bool flushTriggeredFlag;
@@ -182,6 +184,8 @@
qpid::management::Args&);
private:
+ void free_read_buffers();
+
inline void setGetEventTimer()
{
getEventsFireEventsPtr->addRef();
Modified: store/trunk/cpp/tests/failing_python_tests.txt
===================================================================
--- store/trunk/cpp/tests/failing_python_tests.txt 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/failing_python_tests.txt 2008-06-05 22:07:06 UTC (rev 2137)
@@ -1,4 +0,0 @@
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_persistent
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_persistent
Property changes on: store/trunk/cpp/tests/python_tests
___________________________________________________________________
Name: svn:ignore
+ *.pyc
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-05 22:07:06 UTC (rev 2137)
@@ -19,34 +19,51 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.testlib import TestBase010
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
class AsyncFlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
- def test_simple_max_count_transient(self):
+ def test_01_simple_max_count_transient(self):
queue_args = {'qpid.max_count': 10}
- self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent)
+ self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_count_persistent(self):
+ def test_02_simple_max_count_persistent(self):
queue_args = {'qpid.max_count': 10}
- self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent)
+ self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_size_transient(self):
+ def test_03_simple_max_size_transient(self):
queue_args = {'qpid.max_size': 100}
- self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent)
+ self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_size_persistent(self):
+ def test_04_simple_max_size_persistent(self):
queue_args = {'qpid.max_size': 100}
- self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent)
+ self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
- def simple_limit(self, queue_name, queue_args, delivery_mode):
+ def test_05_simple_max_count_transient_not_acquired(self):
+ queue_args = {'qpid.max_count': 10}
+ self.simple_limit("test_simple_max_count_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_06_simple_max_count_persistent_not_acquired(self):
+ queue_args = {'qpid.max_count': 10}
+ self.simple_limit("test_simple_max_count_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def test_07_simple_max_size_transient_not_acquired(self):
+ queue_args = {'qpid.max_size': 100}
+ self.simple_limit("test_simple_max_size_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_08_simple_max_size_persistent_not_acquired(self):
+ queue_args = {'qpid.max_size': 100}
+ self.simple_limit("test_simple_max_size_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
"""
- Test a simple case of max message count.
+ Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count 10 or max_size 100
- * 15 messages are added. The last five will flow to disk.
+ * 15 messages of size 10 are added. The last five will flow to disk.
* Consume 15 messages.
+ * Check the broker has no messages left.
"""
session = self.session
@@ -57,17 +74,96 @@
msg_str = "Message %02d" % msg_num
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
- # Consume 15 messages
- session.message_subscribe(queue=queue_name, destination="tag")
+ # Consume/browse 15 messages
+ session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tag")
+ ids = RangedSet()
for msg_num in range(0, 15):
expected_str = "Message %02d" % msg_num
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ # If not_acquired, chek messages are still on queue, then acquire/accept
+ if acquire_mode == self.session.acquire_mode.not_acquired:
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ response = session.message_acquire(ids)
+ for range_ in ids:
+ for msg_id in range_:
+ self.assert_(msg_id in response.transfers)
+ session.message_accept(ids)
+
# Check queue is empty
session.queue_declare(queue=queue_name)
- reply = session.queue_query(queue=queue_name)
- self.assertEqual(0, reply.message_count)
+ self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+
+
+ def test_09_max_count_browse_consume_transient(self):
+ queue_args = {'qpid.max_count': 10}
+ self.not_acquired_browse_consume_limit("test_max_count_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+ def test_10_max_count_browse_consume_persistent(self):
+ queue_args = {'qpid.max_count': 10}
+ self.not_acquired_browse_consume_limit("test_max_count_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+
+ def test_11_max_size_browse_consume_transient(self):
+ queue_args = {'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_max_size_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+ def test_12_max_size_browse_consume_persistent(self):
+ queue_args = {'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_max_size_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+
+
+ def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+ """
+ Test to check browsing then subsequent consumption of flow-to-disk messages.
+ * 15 messages of size 10 are added. The last five will flow to disk.
+ * Browse 15 messages, then release them.
+ * Checks the broker still has all messages.
+ * Consumes 15 messages
+ * Checks the broker has no messages left.
+ """
+
+ session = self.session
+ session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
+
+ # Add 15 messages
+ for msg_num in range(0, 15):
+ msg_str = "Message %02d" % msg_num
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
+
+ # Browse 15 messages
+ session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
+ session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = session.incoming("tagA")
+ ids = RangedSet()
+ for msg_num in range(0, 15):
+ expected_str = "Message %02d" % msg_num
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+
+ # Release all 15 messages and close
+ session.message_release(ids)
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+
+ # Cancel subscription, start new one that consumes
+ session.message_cancel(destination="tagA")
+ session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
+ session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = session.incoming("tagB")
+ for msg_num in range(0, 15):
+ expected_str = "Message %02d" % msg_num
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+
+ # Check queue is empty
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
More information about the rhmessaging-commits
mailing list