Author: kpvdr
Date: 2010-08-10 13:33:04 -0400 (Tue, 10 Aug 2010)
New Revision: 4196
Added:
store/trunk/cpp/tests/python_tests/resize.py
Modified:
store/trunk/cpp/tests/python_tests/__init__.py
store/trunk/cpp/tests/python_tests/client_persistence.py
store/trunk/cpp/tests/python_tests/store_test.py
store/trunk/cpp/tests/run_python_tests
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/resize
Log:
Fix for BZ 620676 - "Store resize operation fails with large messages (greater than
store file size)". Also included new resize tests that would catch this bug.
Modified: store/trunk/cpp/tests/python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/python_tests/__init__.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/__init__.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -23,3 +23,4 @@
from client_persistence import *
from flow_to_disk import *
+from resize import *
Modified: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py 2010-08-09 19:01:28 UTC (rev
4195)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py 2010-08-10 17:33:04 UTC (rev
4196)
@@ -33,20 +33,20 @@
def test_direct_exchange(self):
"""Test Direct exchange."""
- broker = self.broker(store_args(), name="testDirectExchange",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_direct_exchange",
expect=EXPECT_EXIT_OK)
msg1 = Message("A_Message1", durable=True,
correlation_id="Msg0001")
msg2 = Message("B_Message1", durable=True,
correlation_id="Msg0002")
broker.send_message("a", msg1)
broker.send_message("b", msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testDirectExchange")
+ broker = self.broker(store_args(), name="test_direct_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg2, True)
def test_topic_exchange(self):
"""Test Topic exchange."""
- broker = self.broker(store_args(), name="testTopicExchange",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_topic_exchange",
expect=EXPECT_EXIT_OK)
ssn = broker.connect().session()
snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic,
durable:True}}")
snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic,
durable:True}}")
@@ -62,7 +62,7 @@
snd2.send(msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testTopicExchange")
+ broker = self.broker(store_args(), name="test_topic_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg1, True)
self.check_messages(broker, "c", [msg1, msg2], True)
@@ -72,7 +72,7 @@
def test_lvq(self):
"""Test LVQ."""
- broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_lvq",
expect=EXPECT_EXIT_OK)
ma1 = Message("A1", durable=True, correlation_id="Msg0005",
properties={"qpid.LVQ_key":"A"})
ma2 = Message("A2", durable=True, correlation_id="Msg0006",
properties={"qpid.LVQ_key":"A"})
mb1 = Message("B1", durable=True, correlation_id="Msg0007",
properties={"qpid.LVQ_key":"B"})
@@ -83,7 +83,7 @@
xprops="arguments:{\"qpid.last_value_queue\":True}")
broker.terminate()
- broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_lvq",
expect=EXPECT_EXIT_OK)
ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1],
empty=True, ack=False)
# Add more messages while subscriber is active (no replacement):
ma3 = Message("A3", durable=True, correlation_id="Msg0011",
properties={"qpid.LVQ_key":"A"})
@@ -95,12 +95,12 @@
ssn.acknowledge()
broker.terminate()
- broker = self.broker(store_args(), name="testLVQ")
+ broker = self.broker(store_args(), name="test_lvq")
self.check_messages(broker, "lvq-test", [mc4, ma4], True)
def test_fanout_exchange(self):
"""Test Fanout Exchange"""
- broker = self.broker(store_args(), name="testFanout",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_fanout_exchange",
expect=EXPECT_EXIT_OK)
ssn = broker.connect().session()
snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic,
x-declare: {type: fanout}}}")
ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable:
True, reliability:at-least-once}}")
@@ -112,7 +112,7 @@
snd.send(msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testFanout")
+ broker = self.broker(store_args(), name="test_fanout_exchange")
self.check_messages(broker, "q1", [msg1, msg2], True)
self.check_messages(broker, "q2", [msg1, msg2], True)
self.check_messages(broker, "q3", [msg1, msg2], True)
@@ -125,14 +125,14 @@
def test_exchange(self):
"""Exchange alternate exchange property persistence
test"""
- broker = self.broker(store_args(), name="testExchangeBroker",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_exchange",
expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
qmf.add_exchange("testExch", "direct", durable=True,
alt_exchange_name="altExch")
qmf.close()
broker.terminate()
- broker = self.broker(store_args(), name="testExchangeBroker")
+ broker = self.broker(store_args(), name="test_exchange")
qmf = Qmf(broker)
try:
qmf.add_exchange("altExch", "direct", passive=True)
@@ -148,14 +148,14 @@
def test_queue(self):
"""Queue alternate exchange property persistexchangeNamece
test"""
- broker = self.broker(store_args(), name="testQueueBroker",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_queue",
expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
qmf.add_queue("testQueue", durable=True,
alt_exchange_name="altExch")
qmf.close()
broker.terminate()
- broker = self.broker(store_args(), name="testQueueBroker")
+ broker = self.broker(store_args(), name="test_queue")
qmf = Qmf(broker)
try:
qmf.add_exchange("altExch", "direct", passive=True)
@@ -177,13 +177,13 @@
def test_broker_recovery(self):
"""Test that the redelivered flag is set on messages after
recovery of broker"""
- broker = self.broker(store_args(), name="testAfterRecover",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_broker_recovery",
expect=EXPECT_EXIT_OK)
msg_content = "xyz"*100
msg = Message(msg_content, durable=True)
broker.send_message("testQueue", msg)
broker.terminate()
- broker = self.broker(store_args(), name="testAfterRecover")
+ broker = self.broker(store_args(), name="test_broker_recovery")
rcv_msg = broker.get_message("testQueue")
self.assertEqual(msg_content, rcv_msg.content)
self.assertTrue(rcv_msg.redelivered)
Added: store/trunk/cpp/tests/python_tests/resize.py
===================================================================
--- store/trunk/cpp/tests/python_tests/resize.py (rev 0)
+++ store/trunk/cpp/tests/python_tests/resize.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -0,0 +1,169 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import glob
+import os
+import subprocess
+
+from qpid.brokertest import EXPECT_EXIT_OK
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message
+
+class ResizeTest(StoreTest):
+
+ resize_tool = os.getenv("RESIZE_TOOL", "../../../tools/resize")
+
+ def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size,
exp_fail):
+ for f in glob.glob(os.path.join(store_dir, "*")):
+ final_store_dir = os.path.join(f, queue_name)
+ p = subprocess.Popen([self.resize_tool, final_store_dir,
"--num-jfiles", str(resize_num_files),
+ "--jfile-size-pgs", str(resize_file_size),
"--quiet"], stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT)
+ res = p.wait()
+ err_found = False
+ try:
+ for l in p.stdout:
+ if exp_fail:
+ err_found = True
+ print "[Expected error]:",
+ print l,
+ finally:
+ p.stdout.close()
+ return res
+
+ def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files,
resize_file_size, init_num_files = 8,
+ init_file_size = 24, exp_fail = False, wait_time = None):
+ # Using a sender will force the creation of an empty persistent queue which is
needed for some tests
+ broker = self.broker(store_args(), name="broker",
expect=EXPECT_EXIT_OK, wait=wait_time)
+ ssn = broker.connect().session()
+ snd = ssn.sender("%s; {create:always, node:{durable:True}}" %
queue_name)
+
+ msgs = []
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(),
correlation_id="msg-%04d"%index)
+ msgs.append(msg)
+ snd.send(msg)
+ broker.terminate()
+
+ res = self._resize_store(os.path.join(self.dir, "broker",
"rhm", "jrnl"), queue_name, resize_num_files,
+ resize_file_size, exp_fail)
+ if res != 0:
+ if exp_fail:
+ return
+ self.fail("ERROR: Resize operation failed with return code %d" %
res)
+ elif exp_fail:
+ self.fail("ERROR: Resize operation succeeded, but a failure was
expected")
+
+ broker = self.broker(store_args(), name="broker")
+ self.check_messages(broker, queue_name, msgs, True)
+
+
+class SimpleTest(ResizeTest):
+ """
+ Simple tests of the resize utility for resizing a journal to larger and smaller
sizes.
+ """
+
+ def test_empty_store_same(self):
+ self._resize_test(queue_name = "empty_store_same",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 8, resize_file_size = 24)
+
+ def test_empty_store_up(self):
+ self._resize_test(queue_name = "empty_store_up",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_empty_store_down(self):
+ self._resize_test(queue_name = "empty_store_down",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 6, resize_file_size = 12)
+
+# Put into long tests, make sure there is > 128GB free disk space
+# def test_empty_store_max(self):
+# self._resize_test(queue_name = "empty_store_max",
+# num_msgs = 0, msg_size = 0,
+# init_num_files = 8, init_file_size = 24,
+# resize_num_files = 64, resize_file_size = 32768,
+# wait_time = 120)
+
+ def test_empty_store_min(self):
+ self._resize_test(queue_name = "empty_store_min",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 1)
+
+ def test_basic_up(self):
+ self._resize_test(queue_name = "basic_up",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_basic_down(self):
+ self._resize_test(queue_name = "basic_down",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 15)
+
+ def test_basic_low(self):
+ self._resize_test(queue_name = "basic_low",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 4,
+ exp_fail = True)
+
+ def test_basic_under(self):
+ self._resize_test(queue_name = "basic_under",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 3,
+ exp_fail = True)
+
+ def test_very_large_msg_up(self):
+ self._resize_test(queue_name = "very_large_msg_up",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_very_large_msg_down(self):
+ self._resize_test(queue_name = "very_large_msg_down",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 16, init_file_size = 64,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_very_large_msg_low(self):
+ self._resize_test(queue_name = "very_large_msg_low",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 7, resize_file_size = 20,
+ exp_fail = True)
+
+ def test_very_large_msg_under(self):
+ self._resize_test(queue_name = "very_large_msg_under",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 6, resize_file_size = 8,
+ exp_fail = True)
Modified: store/trunk/cpp/tests/python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/store_test.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -27,10 +27,12 @@
from qmf.console import Session
-def store_args():
+def store_args(store_dir = None):
"""Return the broker args necessary to load the async
store"""
assert BrokerTest.store_lib
- return ["--load-module", BrokerTest.store_lib]
+ if store_dir == None:
+ return ["--load-module", BrokerTest.store_lib]
+ return ["--load-module", BrokerTest.store_lib, "--store-dir",
store_dir]
class Qmf:
"""
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/run_python_tests 2010-08-10 17:33:04 UTC (rev 4196)
@@ -46,7 +46,7 @@
xLONG_TEST)
DEFAULT_PYTHON_TESTS= ;;
x)
- DEFAULT_PYTHON_TESTS="*.client_persistence.*
*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1
*.flow_to_disk.MultiQueue*.test_mixed_limit_1" ;;
+ DEFAULT_PYTHON_TESTS="*.client_persistence.*
*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1
*.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.*" ;;
*)
DEFAULT_PYTHON_TESTS=$1
esac
Modified: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/janal.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -408,11 +408,11 @@
return self._txn_msg_cnt
def txn_obj_list(self):
- """Get a cululative list of transaction objects (commits and
aborts)"""
+ """Get a cumulative list of transaction objects (commits and
aborts)"""
return self._txn_obj_list
def _advance_jrnl_file(self, *oldest_file_info):
- """Rotate to using the next journal file. Return False if the
operation was sucessful, True if there are no
+ """Rotate to using the next journal file. Return False if the
operation was successful, True if there are no
more files to read."""
fro_seek_flag = False
if len(oldest_file_info) > 0:
@@ -454,7 +454,7 @@
def _check_owi(self, hdr):
"""Return True if the header's owi indicator matches that of
the file header record; False otherwise. This can
- indicate wheher the last record in a file has been read and now older records
which have not yet been
+ indicate whether the last record in a file has been read and now older records
which have not yet been
overwritten are now being read."""
return self._file_hdr_owi == hdr.owi()
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/resize 2010-08-10 17:33:04 UTC (rev 4196)
@@ -155,7 +155,7 @@
if self._file == None:
rid = hdr.rid
elif len(rid_list) == 0:
- rid = None
+ rid = 0
else:
rid = rid_list[0]
if not self._rotate_file(rid, fro):