[rhmessaging-commits] rhmessaging commits: r4196 - in store/trunk/cpp: tests/python_tests and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Aug 10 13:33:04 EDT 2010


Author: kpvdr
Date: 2010-08-10 13:33:04 -0400 (Tue, 10 Aug 2010)
New Revision: 4196

Added:
   store/trunk/cpp/tests/python_tests/resize.py
Modified:
   store/trunk/cpp/tests/python_tests/__init__.py
   store/trunk/cpp/tests/python_tests/client_persistence.py
   store/trunk/cpp/tests/python_tests/store_test.py
   store/trunk/cpp/tests/run_python_tests
   store/trunk/cpp/tools/janal.py
   store/trunk/cpp/tools/resize
Log:
Fix for BZ 620676 - "Store resize operation fails with large messages (greater than store file size)". Also included new resize tests that would catch this bug.

Modified: store/trunk/cpp/tests/python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/python_tests/__init__.py	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/__init__.py	2010-08-10 17:33:04 UTC (rev 4196)
@@ -23,3 +23,4 @@
 
 from client_persistence import *
 from flow_to_disk import *
+from resize import *

Modified: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py	2010-08-10 17:33:04 UTC (rev 4196)
@@ -33,20 +33,20 @@
     
     def test_direct_exchange(self):
         """Test Direct exchange."""
-        broker = self.broker(store_args(), name="testDirectExchange", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK)
         msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
         msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
         broker.send_message("a", msg1)
         broker.send_message("b", msg2)
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testDirectExchange")
+        broker = self.broker(store_args(), name="test_direct_exchange")
         self.check_message(broker, "a", msg1, True)
         self.check_message(broker, "b", msg2, True)
     
     def test_topic_exchange(self):
         """Test Topic exchange."""
-        broker = self.broker(store_args(), name="testTopicExchange", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK)
         ssn = broker.connect().session()
         snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}")
         snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}")
@@ -62,7 +62,7 @@
         snd2.send(msg2)
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testTopicExchange")
+        broker = self.broker(store_args(), name="test_topic_exchange")
         self.check_message(broker, "a", msg1, True)
         self.check_message(broker, "b", msg1, True)
         self.check_messages(broker, "c", [msg1, msg2], True)
@@ -72,7 +72,7 @@
     
     def test_lvq(self):
         """Test LVQ."""        
-        broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
         ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
         ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
         mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"})
@@ -83,7 +83,7 @@
                              xprops="arguments:{\"qpid.last_value_queue\":True}")
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
         ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
         # Add more messages while subscriber is active (no replacement):
         ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"})
@@ -95,12 +95,12 @@
         ssn.acknowledge()
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testLVQ")
+        broker = self.broker(store_args(), name="test_lvq")
         self.check_messages(broker, "lvq-test", [mc4, ma4], True)
     
     def test_fanout_exchange(self):
         """Test Fanout Exchange"""
-        broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
         ssn = broker.connect().session()
         snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
         ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
@@ -112,7 +112,7 @@
         snd.send(msg2)
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testFanout")
+        broker = self.broker(store_args(), name="test_fanout_exchange")
         self.check_messages(broker, "q1", [msg1, msg2], True)
         self.check_messages(broker, "q2", [msg1, msg2], True)
         self.check_messages(broker, "q3", [msg1, msg2], True)
@@ -125,14 +125,14 @@
 
     def test_exchange(self):
         """Exchange alternate exchange property persistence test"""
-        broker = self.broker(store_args(), name="testExchangeBroker", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK)
         qmf = Qmf(broker)
         qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
         qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
         qmf.close()
         broker.terminate()
 
-        broker = self.broker(store_args(), name="testExchangeBroker")
+        broker = self.broker(store_args(), name="test_exchange")
         qmf = Qmf(broker)
         try:
             qmf.add_exchange("altExch", "direct", passive=True)
@@ -148,14 +148,14 @@
         
     def test_queue(self):
         """Queue alternate exchange property persistexchangeNamece test"""
-        broker = self.broker(store_args(), name="testQueueBroker", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK)
         qmf = Qmf(broker)
         qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
         qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
         qmf.close()
         broker.terminate()
 
-        broker = self.broker(store_args(), name="testQueueBroker")
+        broker = self.broker(store_args(), name="test_queue")
         qmf = Qmf(broker)
         try:
             qmf.add_exchange("altExch", "direct", passive=True)
@@ -177,13 +177,13 @@
 
     def test_broker_recovery(self):
         """Test that the redelivered flag is set on messages after recovery of broker"""
-        broker = self.broker(store_args(), name="testAfterRecover", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK)
         msg_content = "xyz"*100
         msg = Message(msg_content, durable=True)
         broker.send_message("testQueue", msg)
         broker.terminate()
         
-        broker = self.broker(store_args(), name="testAfterRecover")
+        broker = self.broker(store_args(), name="test_broker_recovery")
         rcv_msg = broker.get_message("testQueue")
         self.assertEqual(msg_content, rcv_msg.content)
         self.assertTrue(rcv_msg.redelivered)

Added: store/trunk/cpp/tests/python_tests/resize.py
===================================================================
--- store/trunk/cpp/tests/python_tests/resize.py	                        (rev 0)
+++ store/trunk/cpp/tests/python_tests/resize.py	2010-08-10 17:33:04 UTC (rev 4196)
@@ -0,0 +1,169 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import glob
+import os
+import subprocess
+
+from qpid.brokertest import EXPECT_EXIT_OK
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message
+
+class ResizeTest(StoreTest):
+    
+    resize_tool = os.getenv("RESIZE_TOOL", "../../../tools/resize")
+    
+    def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail):
+        for f in glob.glob(os.path.join(store_dir, "*")):
+            final_store_dir = os.path.join(f, queue_name)
+            p = subprocess.Popen([self.resize_tool, final_store_dir, "--num-jfiles", str(resize_num_files),
+                                  "--jfile-size-pgs", str(resize_file_size), "--quiet"], stdout = subprocess.PIPE,
+                                  stderr = subprocess.STDOUT)
+            res = p.wait()
+            err_found = False
+            try:
+                for l in p.stdout:
+                    if exp_fail:
+                        err_found = True
+                        print "[Expected error]:",
+                    print l,
+            finally:
+                p.stdout.close()
+            return res
+    
+    def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8,
+                    init_file_size = 24, exp_fail = False, wait_time = None):
+        # Using a sender will force the creation of an empty persistent queue which is needed for some tests
+        broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time)
+        ssn = broker.connect().session()
+        snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name)
+        
+        msgs = []
+        for index in range(0, num_msgs):
+            msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index)
+            msgs.append(msg)
+            snd.send(msg)
+        broker.terminate()
+        
+        res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files,
+                             resize_file_size, exp_fail)
+        if res != 0:
+            if exp_fail:
+                return
+            self.fail("ERROR: Resize operation failed with return code %d" % res)
+        elif exp_fail:
+            self.fail("ERROR: Resize operation succeeded, but a failure was expected")
+        
+        broker = self.broker(store_args(), name="broker")
+        self.check_messages(broker, queue_name, msgs, True)
+
+
+class SimpleTest(ResizeTest):
+    """
+    Simple tests of the resize utility for resizing a journal to larger and smaller sizes.
+    """
+    
+    def test_empty_store_same(self):
+        self._resize_test(queue_name = "empty_store_same",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 8, resize_file_size = 24)
+    
+    def test_empty_store_up(self):
+        self._resize_test(queue_name = "empty_store_up",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16,  resize_file_size = 48)
+    
+    def test_empty_store_down(self):
+        self._resize_test(queue_name = "empty_store_down",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 6, resize_file_size = 12)
+ 
+# Put into long tests, make sure there is > 128GB free disk space       
+#    def test_empty_store_max(self):
+#        self._resize_test(queue_name = "empty_store_max",
+#                          num_msgs = 0, msg_size = 0,
+#                          init_num_files = 8, init_file_size = 24,
+#                          resize_num_files = 64, resize_file_size = 32768,
+#                          wait_time = 120)
+    
+    def test_empty_store_min(self):
+        self._resize_test(queue_name = "empty_store_min",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 1)
+    
+    def test_basic_up(self):
+        self._resize_test(queue_name = "basic_up",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_basic_down(self):
+        self._resize_test(queue_name = "basic_down",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 15)
+    
+    def test_basic_low(self):
+        self._resize_test(queue_name = "basic_low",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 4,
+                          exp_fail = True)
+    
+    def test_basic_under(self):
+        self._resize_test(queue_name = "basic_under",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 3,
+                          exp_fail = True)
+    
+    def test_very_large_msg_up(self):
+        self._resize_test(queue_name = "very_large_msg_up",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_very_large_msg_down(self):
+        self._resize_test(queue_name = "very_large_msg_down",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 16, init_file_size = 64,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_very_large_msg_low(self):
+        self._resize_test(queue_name = "very_large_msg_low",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 7, resize_file_size = 20,
+                          exp_fail = True)
+    
+    def test_very_large_msg_under(self):
+        self._resize_test(queue_name = "very_large_msg_under",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 6, resize_file_size = 8,
+                          exp_fail = True)

Modified: store/trunk/cpp/tests/python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/store_test.py	2010-08-10 17:33:04 UTC (rev 4196)
@@ -27,10 +27,12 @@
 from qmf.console import Session
 
     
-def store_args():
+def store_args(store_dir = None):
     """Return the broker args necessary to load the async store"""
     assert BrokerTest.store_lib 
-    return ["--load-module", BrokerTest.store_lib]
+    if store_dir == None:
+        return ["--load-module", BrokerTest.store_lib]
+    return ["--load-module", BrokerTest.store_lib, "--store-dir", store_dir]
 
 class Qmf:
     """

Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/run_python_tests	2010-08-10 17:33:04 UTC (rev 4196)
@@ -46,7 +46,7 @@
     xLONG_TEST)
         DEFAULT_PYTHON_TESTS= ;;
     x)
-        DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1" ;;
+        DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.*" ;;
     *)
         DEFAULT_PYTHON_TESTS=$1
 esac

Modified: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/janal.py	2010-08-10 17:33:04 UTC (rev 4196)
@@ -408,11 +408,11 @@
         return self._txn_msg_cnt
     
     def txn_obj_list(self):
-        """Get a cululative list of transaction objects (commits and aborts)"""
+        """Get a cumulative list of transaction objects (commits and aborts)"""
         return self._txn_obj_list
     
     def _advance_jrnl_file(self, *oldest_file_info):
-        """Rotate to using the next journal file. Return False if the operation was sucessful, True if there are no
+        """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
         more files to read."""
         fro_seek_flag = False
         if len(oldest_file_info) > 0:
@@ -454,7 +454,7 @@
 
     def _check_owi(self, hdr):
         """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
-        indicate wheher the last record in a file has been read and now older records which have not yet been
+        indicate whether the last record in a file has been read and now older records which have not yet been
         overwritten are now being read."""
         return self._file_hdr_owi == hdr.owi()
     

Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize	2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/resize	2010-08-10 17:33:04 UTC (rev 4196)
@@ -155,7 +155,7 @@
                     if self._file == None:
                         rid = hdr.rid
                     elif len(rid_list) == 0:
-                        rid = None
+                        rid = 0
                     else:
                         rid = rid_list[0]
                     if not self._rotate_file(rid, fro):



More information about the rhmessaging-commits mailing list