[rhmessaging-commits] rhmessaging commits: r3451 - in store/trunk/cpp: tests/cluster and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jun 11 14:53:52 EDT 2009


Author: kpvdr
Date: 2009-06-11 14:53:52 -0400 (Thu, 11 Jun 2009)
New Revision: 3451

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/tests/cluster/run_cluster_tests
   store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for BZ505274 - "Large durable messages that 'flow to disk', are not recovered correctly".

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2009-06-11 18:53:52 UTC (rev 3451)
@@ -253,7 +253,7 @@
 #define MAX_AIO_SLEEPS 1000  // 10 sec
 #define AIO_SLEEP_TIME 10000 // 10 ms
 bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length)
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
 {
     if (_dtok.rid() != rid)
     {
@@ -321,13 +321,14 @@
             throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
         }
     }
-    if (_external)
-        return false;
-    u_int32_t offset = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
-    if (offset + length > _dlen) {
-        data.append((const char*)_datap + offset, _dlen - offset);
+
+    if (_external) return false;
+
+    u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
+    if (hdr_offs + offset + length > _dlen) {
+        data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
     } else {
-        data.append((const char*)_datap + offset, length);
+        data.append((const char*)_datap + hdr_offs + offset, length);
     }
     return true;
 }

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.h	2009-06-11 18:53:52 UTC (rev 3451)
@@ -151,7 +151,7 @@
             // Temporary fn to read and save last msg read from journal so it can be assigned
             // in chunks. To be replaced when coding to do this direct from the journal is ready.
             // Returns true if the record is extern, false if local.
-            bool loadMsgContent(u_int64_t rid, std::string& data, size_t length);
+            bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
 
             // Overrides for write inactivity timer
             void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-06-11 18:53:52 UTC (rev 3451)
@@ -1335,7 +1335,7 @@
         try {
             JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
             if (jc && jc->is_enqueued(messageId) ) {
-                if (jc->loadMsgContent(messageId, data, length)) {
+                if (jc->loadMsgContent(messageId, data, length, offset)) {
                     return;
                 }
             }

Modified: store/trunk/cpp/tests/cluster/run_cluster_tests
===================================================================
--- store/trunk/cpp/tests/cluster/run_cluster_tests	2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/cluster/run_cluster_tests	2009-06-11 18:53:52 UTC (rev 3451)
@@ -61,7 +61,7 @@
 	export SENDER_EXEC=${QPID_DIR}/cpp/src/tests/sender
 else
 	# Path from known installed locations
-	CLUSTER_PATH=/usr/libexec/qpid/tests/cluster_test
+	CLUSTER_PATH=/usr/libexec/qpid/tests/${CPP_CLUSTER_EXEC}
 	if test -z ${CLUSTER_PATH} ; then
 		echo "No executable \"${CPP_CLUSTER_EXEC}\" found in path"
 		exit 1
@@ -73,8 +73,8 @@
 	export CLUSTER_LIB=/usr/lib/qpid/daemon/cluster.so
 	export QPID_CONFIG_EXEC=/usr/bin/qpid-config
 	export QPID_ROUTE_EXEC=/usr/bin/qpid-route
-	export RECEIVER_EXEC=/usr/libexec/qpid/test/receiver
-	export SENDER_EXEC=/usr/libexec/qpid/test/sender
+	export RECEIVER_EXEC=/usr/libexec/qpid/tests/receiver
+	export SENDER_EXEC=/usr/libexec/qpid/tests/sender
 fi
 export STORE_LIB=${abs_srcdir}/../../lib/.libs/msgstore.so
 
@@ -99,7 +99,7 @@
 	
 	Unable to load python qpid module - skipping python tests.
 	
-    PYTHONPATH=${PYTHONPATH}"
+    PYTHONPATH=${PYTHONPATH}
 
 	===========================================================
 
@@ -114,8 +114,8 @@
    	mkdir -p ${TMP_STORE_DIR}/cluster
 else
     # Delete old cluster test dirs
-    rm -rf "${TMP_STORE_DIR}/cluster"
-    mkdir -p "${TMP_STORE_DIR}/cluster"
+    rm -rf ${TMP_STORE_DIR}/cluster
+    mkdir -p ${TMP_STORE_DIR}/cluster
 fi
 export TMP_STORE_DIR
 

Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-06-11 18:53:52 UTC (rev 3451)
@@ -28,12 +28,23 @@
 class FlowToDiskTests(TestBase010):
     """Tests for async store flow-to-disk"""
 
+    def _makeMessage(self, msgCnt, msgSize):
+        msg = "Message-%04d" % msgCnt
+        msgLen = len(msg)
+        if msgSize != None and msgSize > msgLen:
+            for i in range(msgLen, msgSize):
+                if i == msgLen:
+                    msg += "-"
+                else:
+                    msg += chr(ord('a') + (i % 26))
+        return msg
+
     def test_FlowToDisk_01_SimpleMaxCountTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
         self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
 
     def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
         self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
 
     def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
@@ -44,23 +55,55 @@
         queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
         self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
 
-    def test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
-        self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
 
-    def test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
-        self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
 
-    def test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired(self):
+    def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
         queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+        self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
 
-    def test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired(self):
+    def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
         queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+        self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
 
-    def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
+    def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+    def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+    def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+    def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+    def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+    def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+    def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+    def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+    def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs = 15, msg_size = None):
         """
         Test a simple case of message limits which will force flow-to-disk.
         * queue_args sets a limit - either max_count 10 or max_size 100
@@ -73,8 +116,8 @@
         session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
 
         # Add 15 messages
-        for msg_num in range(0, 15):
-            msg_str = "Message %02d" % msg_num
+        for msg_num in range(0, num_msgs):
+            msg_str = self._makeMessage(msg_num, msg_size)
             session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
 
         # Consume/browse 15 messages
@@ -83,8 +126,8 @@
         session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
         queue = session.incoming("tag")
         ids = RangedSet()
-        for msg_num in range(0, 15):
-            expected_str = "Message %02d" % msg_num
+        for msg_num in range(0, num_msgs):
+            expected_str = self._makeMessage(msg_num, msg_size)
             msg = queue.get(timeout=5)
             self.assertEqual(expected_str, msg.body)
             ids.add(msg.id)
@@ -92,36 +135,53 @@
         # If not_acquired, chek messages are still on queue, then acquire/accept
         if acquire_mode == self.session.acquire_mode.not_acquired:
             session.queue_declare(queue=queue_name)
-            self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+            self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
             response = session.message_acquire(ids)
             for range_ in ids:
                 for msg_id in range_:
                     self.assert_(msg_id in response.transfers)
-            session.message_accept(ids)
+        
+        session.message_accept(ids)
 
         # Check queue is empty
         session.queue_declare(queue=queue_name)
         self.assertEqual(0, session.queue_query(queue=queue_name).message_count)        
 
 
-    def test_FlowToDisk_09_MaxCountBrowseConsumeTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_09_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+    def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
 
-    def test_FlowToDisk_10_MaxCountBrowseConsumePersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_10_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+    def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
 
-    def test_FlowToDisk_11_MaxSizeBrowseConsumeTransient(self):
+    def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
         queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_11_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
 
-    def test_FlowToDisk_12_MaxSizeBrowseConsumePersistent(self):
+    def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
         queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_12_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+
+    def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+    def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+
+    def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+    def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
+        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
         
 
-    def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+    def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode, num_msgs = 15, msg_size = None):
         """
         Test to check browsing then subsequent consumption of flow-to-disk messages.
         * 15 messages of size 10 are added. The last five will flow to disk.
@@ -135,8 +195,8 @@
         session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
 
         # Add 15 messages
-        for msg_num in range(0, 15):
-            msg_str = "Message %02d" % msg_num
+        for msg_num in range(0, num_msgs):
+            msg_str = self._makeMessage(msg_num, msg_size)
             session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
 
         # Browse 15 messages
@@ -145,8 +205,8 @@
         session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
         queue = session.incoming("tagA")
         ids = RangedSet()
-        for msg_num in range(0, 15):
-            expected_str = "Message %02d" % msg_num
+        for msg_num in range(0, num_msgs):
+            expected_str = self._makeMessage(msg_num, msg_size)
             msg = queue.get(timeout=5)
             self.assertEqual(expected_str, msg.body)
             ids.add(msg.id)
@@ -154,7 +214,7 @@
         # Release all 15 messages and close
         session.message_release(ids)
         session.queue_declare(queue=queue_name)
-        self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+        self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
 
         # Cancel subscription, start new one that consumes
         session.message_cancel(destination="tagA")
@@ -162,10 +222,13 @@
         session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
         session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
         queue = session.incoming("tagB")
-        for msg_num in range(0, 15):
-            expected_str = "Message %02d" % msg_num
+        ids = RangedSet()
+        for msg_num in range(0, num_msgs):
+            expected_str = self._makeMessage(msg_num, msg_size)
             msg = queue.get(timeout=5)
             self.assertEqual(expected_str, msg.body)
+            ids.add(msg.id)
+        session.message_accept(ids)
 
         # Check queue is empty
         session.queue_declare(queue=queue_name)




More information about the rhmessaging-commits mailing list