[rhmessaging-commits] rhmessaging commits: r2137 - in store/trunk/cpp: tests and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jun 5 18:07:06 EDT 2008


Author: kpvdr
Date: 2008-06-05 18:07:06 -0400 (Thu, 05 Jun 2008)
New Revision: 2137

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/tests/failing_python_tests.txt
   store/trunk/cpp/tests/python_tests/
   store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for 448935: "Lazy-load with journal will fail for redelivered messages". Added additional flow-to-disk tests that test this mode of failure.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-06-05 22:07:06 UTC (rev 2137)
@@ -53,6 +53,7 @@
                          const qpid::sys::Duration flushTimeout):
                          jcntl(journalId, journalDirectory, journalBaseFilename),
                          getEventsTimerSetFlag(false),
+                         lastReadRid(0),
                          writeActivityFlag(false),
                          flushTriggeredFlag(true),
                          _xidp(0),
@@ -101,14 +102,7 @@
 	}
     (dynamic_cast<GetEventsFireEvent*>(getEventsFireEventsPtr.get()))->cancel();
     (dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
-    if (_xidp) {
-        ::free(_xidp);
-        _xidp = 0;
-        _datap = 0;
-    } else if (_datap) {
-        ::free(_datap);
-        _datap = 0;
-    }
+    free_read_buffers();
 
     // TODO: Make this if() thread-safe
     if (journalTimerPtr && --cnt == 0)
@@ -229,14 +223,9 @@
     if (_dtok.rid() != rid)
     {
         // Free any previous msg
-        if (_xidp) {
-            ::free(_xidp);
-            _xidp = 0;
-            _datap = 0;
-        } else if (_datap) {
-            ::free(_datap);
-            _datap = 0;
-        }
+        free_read_buffers();
+        if (rid < lastReadRid)
+            _rmgr.invalidate();
         _dlen = 0;
         _dtok.reset();
         _dtok.set_wstate(DataTokenImpl::ENQ);
@@ -249,30 +238,36 @@
         unsigned aio_sleep_cnt = 0;
         while (!done) {
             iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
-            bool rid_low = _dtok.rid() < rid;
-            rid_found = _dtok.rid() == rid;
-            if (res == journal::RHM_IORES_SUCCESS && !rid_low) {
-                done = true;
-            } else if (res == journal::RHM_IORES_PAGE_AIOWAIT) {
-                if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
-                    get_wr_events();
-                    usleep(AIO_SLEEP_TIME);
-                } else {
+            switch (res) {
+                case journal::RHM_IORES_SUCCESS:
+                    if (_dtok.rid() < rid) {
+                        free_read_buffers();
+                        // reset data token for next read
+                        _dlen = 0;
+                        _dtok.reset();
+                        _dtok.set_wstate(DataTokenImpl::ENQ);
+                        _dtok.set_rid(0);
+                    } else {
+                        rid_found = _dtok.rid() == rid;
+                        lastReadRid = rid;
+                        done = true;
+                    }
+                    break;
+                case journal::RHM_IORES_PAGE_AIOWAIT:
+                    if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+                        get_wr_events();
+                        usleep(AIO_SLEEP_TIME);
+                    } else {
+                        std::stringstream ss;
+                        ss << "read_data_record() returned " << journal::iores_str(res);
+                        ss << "; exceeded maximum wait time";
+                        throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+                    }
+                    break;
+                default:
                     std::stringstream ss;
                     ss << "read_data_record() returned " << journal::iores_str(res);
-                    ss << "; exceeded maximum wait time";
                     throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
-                }
-            } else if (rid_low) {
-                // reset data token for next read
-                _dlen = 0;
-                _dtok.reset();
-                _dtok.set_wstate(DataTokenImpl::ENQ);
-                _dtok.set_rid(0);
-            } else {
-                std::stringstream ss;
-                ss << "read_data_record() returned " << journal::iores_str(res);
-                throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
             }
         }
         if (!rid_found) {
@@ -447,6 +442,19 @@
 }
 
 void
+JournalImpl::free_read_buffers()
+{
+    if (_xidp) {
+        ::free(_xidp);
+        _xidp = 0;
+        _datap = 0;
+    } else if (_datap) {
+        ::free(_datap);
+        _datap = 0;
+    }
+}
+
+void
 JournalImpl::handleIoResult(const iores r)
 {
     writeActivityFlag = true;

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-06-05 22:07:06 UTC (rev 2137)
@@ -74,6 +74,8 @@
             bool getEventsTimerSetFlag;
             boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
             pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+            
+            u_int64_t lastReadRid; // rid of last read msg
 
             bool writeActivityFlag;
             bool flushTriggeredFlag;
@@ -182,6 +184,8 @@
                     qpid::management::Args&);
 
         private:
+            void free_read_buffers();
+
             inline void setGetEventTimer()
             {
                 getEventsFireEventsPtr->addRef();

Modified: store/trunk/cpp/tests/failing_python_tests.txt
===================================================================
--- store/trunk/cpp/tests/failing_python_tests.txt	2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/failing_python_tests.txt	2008-06-05 22:07:06 UTC (rev 2137)
@@ -1,4 +0,0 @@
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_persistent
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_persistent


Property changes on: store/trunk/cpp/tests/python_tests
___________________________________________________________________
Name: svn:ignore
   + *.pyc


Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py	2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py	2008-06-05 22:07:06 UTC (rev 2137)
@@ -19,34 +19,51 @@
 from qpid.client import Client, Closed
 from qpid.queue import Empty
 from qpid.testlib import TestBase010
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
 from qpid.session import SessionException
 
 class AsyncFlowToDiskTests(TestBase010):
     """Tests for async store flow-to-disk"""
 
-    def test_simple_max_count_transient(self):
+    def test_01_simple_max_count_transient(self):
         queue_args = {'qpid.max_count': 10}
-        self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent)
+        self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
 
-    def test_simple_max_count_persistent(self):
+    def test_02_simple_max_count_persistent(self):
         queue_args = {'qpid.max_count': 10}
-        self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent)
+        self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
 
-    def test_simple_max_size_transient(self):
+    def test_03_simple_max_size_transient(self):
         queue_args = {'qpid.max_size': 100}
-        self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent)
+        self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
 
-    def test_simple_max_size_persistent(self):
+    def test_04_simple_max_size_persistent(self):
         queue_args = {'qpid.max_size': 100}
-        self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent)
+        self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
 
-    def simple_limit(self, queue_name, queue_args, delivery_mode):
+    def test_05_simple_max_count_transient_not_acquired(self):
+        queue_args = {'qpid.max_count': 10}
+        self.simple_limit("test_simple_max_count_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+    def test_06_simple_max_count_persistent_not_acquired(self):
+        queue_args = {'qpid.max_count': 10}
+        self.simple_limit("test_simple_max_count_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+    def test_07_simple_max_size_transient_not_acquired(self):
+        queue_args = {'qpid.max_size': 100}
+        self.simple_limit("test_simple_max_size_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+    def test_08_simple_max_size_persistent_not_acquired(self):
+        queue_args = {'qpid.max_size': 100}
+        self.simple_limit("test_simple_max_size_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+    def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
         """
-        Test a simple case of max message count.
+        Test a simple case of message limits which will force flow-to-disk.
         * queue_args sets a limit - either max_count 10 or max_size 100
-        * 15 messages are added. The last five will flow to disk.
+        * 15 messages of size 10 are added. The last five will flow to disk.
         * Consume 15 messages.
+        * Check the broker has no messages left.
         """
 
         session = self.session
@@ -57,17 +74,96 @@
             msg_str = "Message %02d" % msg_num
             session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
 
-        # Consume 15 messages
-        session.message_subscribe(queue=queue_name, destination="tag")
+        # Consume/browse 15 messages
+        session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
         session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
         session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
         queue = session.incoming("tag")
+        ids = RangedSet()
         for msg_num in range(0, 15):
             expected_str = "Message %02d" % msg_num
             msg = queue.get(timeout=5)
             self.assertEqual(expected_str, msg.body)
+            ids.add(msg.id)
 
+        # If not_acquired, chek messages are still on queue, then acquire/accept
+        if acquire_mode == self.session.acquire_mode.not_acquired:
+            session.queue_declare(queue=queue_name)
+            self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+            response = session.message_acquire(ids)
+            for range_ in ids:
+                for msg_id in range_:
+                    self.assert_(msg_id in response.transfers)
+            session.message_accept(ids)
+
         # Check queue is empty
         session.queue_declare(queue=queue_name)
-        reply = session.queue_query(queue=queue_name)
-        self.assertEqual(0, reply.message_count)        
+        self.assertEqual(0, session.queue_query(queue=queue_name).message_count)        
+
+
+    def test_09_max_count_browse_consume_transient(self):
+        queue_args = {'qpid.max_count': 10}
+        self.not_acquired_browse_consume_limit("test_max_count_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+    def test_10_max_count_browse_consume_persistent(self):
+        queue_args = {'qpid.max_count': 10}
+        self.not_acquired_browse_consume_limit("test_max_count_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+
+    def test_11_max_size_browse_consume_transient(self):
+        queue_args = {'qpid.max_size': 100}
+        self.not_acquired_browse_consume_limit("test_max_size_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+    def test_12_max_size_browse_consume_persistent(self):
+        queue_args = {'qpid.max_size': 100}
+        self.not_acquired_browse_consume_limit("test_max_size_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+        
+
+    def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+        """
+        Test to check browsing then subsequent consumption of flow-to-disk messages.
+        * 15 messages of size 10 are added. The last five will flow to disk.
+        * Browse 15 messages, then release them.
+        * Checks the broker still has all messages.
+        * Consumes 15 messages
+        * Checks the broker has no messages left.
+        """
+
+        session = self.session
+        session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
+
+        # Add 15 messages
+        for msg_num in range(0, 15):
+            msg_str = "Message %02d" % msg_num
+            session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
+
+        # Browse 15 messages
+        session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
+        session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
+        session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+        queue = session.incoming("tagA")
+        ids = RangedSet()
+        for msg_num in range(0, 15):
+            expected_str = "Message %02d" % msg_num
+            msg = queue.get(timeout=5)
+            self.assertEqual(expected_str, msg.body)
+            ids.add(msg.id)
+
+        # Release all 15 messages and close
+        session.message_release(ids)
+        session.queue_declare(queue=queue_name)
+        self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+
+        # Cancel subscription, start new one that consumes
+        session.message_cancel(destination="tagA")
+        session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
+        session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
+        session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+        queue = session.incoming("tagB")
+        for msg_num in range(0, 15):
+            expected_str = "Message %02d" % msg_num
+            msg = queue.get(timeout=5)
+            self.assertEqual(expected_str, msg.body)
+
+        # Check queue is empty
+        session.queue_declare(queue=queue_name)
+        self.assertEqual(0, session.queue_query(queue=queue_name).message_count)




More information about the rhmessaging-commits mailing list