rhmessaging commits: r4196 - in store/trunk/cpp: tests/python_tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-08-10 13:33:04 -0400 (Tue, 10 Aug 2010)
New Revision: 4196
Added:
store/trunk/cpp/tests/python_tests/resize.py
Modified:
store/trunk/cpp/tests/python_tests/__init__.py
store/trunk/cpp/tests/python_tests/client_persistence.py
store/trunk/cpp/tests/python_tests/store_test.py
store/trunk/cpp/tests/run_python_tests
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/resize
Log:
Fix for BZ 620676 - "Store resize operation fails with large messages (greater than store file size)". Also included new resize tests that would catch this bug.
Modified: store/trunk/cpp/tests/python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/python_tests/__init__.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/__init__.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -23,3 +23,4 @@
from client_persistence import *
from flow_to_disk import *
+from resize import *
Modified: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -33,20 +33,20 @@
def test_direct_exchange(self):
"""Test Direct exchange."""
- broker = self.broker(store_args(), name="testDirectExchange", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK)
msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
broker.send_message("a", msg1)
broker.send_message("b", msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testDirectExchange")
+ broker = self.broker(store_args(), name="test_direct_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg2, True)
def test_topic_exchange(self):
"""Test Topic exchange."""
- broker = self.broker(store_args(), name="testTopicExchange", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK)
ssn = broker.connect().session()
snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}")
snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}")
@@ -62,7 +62,7 @@
snd2.send(msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testTopicExchange")
+ broker = self.broker(store_args(), name="test_topic_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg1, True)
self.check_messages(broker, "c", [msg1, msg2], True)
@@ -72,7 +72,7 @@
def test_lvq(self):
"""Test LVQ."""
- broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"})
@@ -83,7 +83,7 @@
xprops="arguments:{\"qpid.last_value_queue\":True}")
broker.terminate()
- broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
# Add more messages while subscriber is active (no replacement):
ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"})
@@ -95,12 +95,12 @@
ssn.acknowledge()
broker.terminate()
- broker = self.broker(store_args(), name="testLVQ")
+ broker = self.broker(store_args(), name="test_lvq")
self.check_messages(broker, "lvq-test", [mc4, ma4], True)
def test_fanout_exchange(self):
"""Test Fanout Exchange"""
- broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
ssn = broker.connect().session()
snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
@@ -112,7 +112,7 @@
snd.send(msg2)
broker.terminate()
- broker = self.broker(store_args(), name="testFanout")
+ broker = self.broker(store_args(), name="test_fanout_exchange")
self.check_messages(broker, "q1", [msg1, msg2], True)
self.check_messages(broker, "q2", [msg1, msg2], True)
self.check_messages(broker, "q3", [msg1, msg2], True)
@@ -125,14 +125,14 @@
def test_exchange(self):
"""Exchange alternate exchange property persistence test"""
- broker = self.broker(store_args(), name="testExchangeBroker", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
qmf.close()
broker.terminate()
- broker = self.broker(store_args(), name="testExchangeBroker")
+ broker = self.broker(store_args(), name="test_exchange")
qmf = Qmf(broker)
try:
qmf.add_exchange("altExch", "direct", passive=True)
@@ -148,14 +148,14 @@
def test_queue(self):
"""Queue alternate exchange property persistexchangeNamece test"""
- broker = self.broker(store_args(), name="testQueueBroker", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
qmf.close()
broker.terminate()
- broker = self.broker(store_args(), name="testQueueBroker")
+ broker = self.broker(store_args(), name="test_queue")
qmf = Qmf(broker)
try:
qmf.add_exchange("altExch", "direct", passive=True)
@@ -177,13 +177,13 @@
def test_broker_recovery(self):
"""Test that the redelivered flag is set on messages after recovery of broker"""
- broker = self.broker(store_args(), name="testAfterRecover", expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK)
msg_content = "xyz"*100
msg = Message(msg_content, durable=True)
broker.send_message("testQueue", msg)
broker.terminate()
- broker = self.broker(store_args(), name="testAfterRecover")
+ broker = self.broker(store_args(), name="test_broker_recovery")
rcv_msg = broker.get_message("testQueue")
self.assertEqual(msg_content, rcv_msg.content)
self.assertTrue(rcv_msg.redelivered)
Added: store/trunk/cpp/tests/python_tests/resize.py
===================================================================
--- store/trunk/cpp/tests/python_tests/resize.py (rev 0)
+++ store/trunk/cpp/tests/python_tests/resize.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -0,0 +1,169 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import glob
+import os
+import subprocess
+
+from qpid.brokertest import EXPECT_EXIT_OK
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message
+
+class ResizeTest(StoreTest):
+
+ resize_tool = os.getenv("RESIZE_TOOL", "../../../tools/resize")
+
+ def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail):
+ for f in glob.glob(os.path.join(store_dir, "*")):
+ final_store_dir = os.path.join(f, queue_name)
+ p = subprocess.Popen([self.resize_tool, final_store_dir, "--num-jfiles", str(resize_num_files),
+ "--jfile-size-pgs", str(resize_file_size), "--quiet"], stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT)
+ res = p.wait()
+ err_found = False
+ try:
+ for l in p.stdout:
+ if exp_fail:
+ err_found = True
+ print "[Expected error]:",
+ print l,
+ finally:
+ p.stdout.close()
+ return res
+
+ def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8,
+ init_file_size = 24, exp_fail = False, wait_time = None):
+ # Using a sender will force the creation of an empty persistent queue which is needed for some tests
+ broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time)
+ ssn = broker.connect().session()
+ snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name)
+
+ msgs = []
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index)
+ msgs.append(msg)
+ snd.send(msg)
+ broker.terminate()
+
+ res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files,
+ resize_file_size, exp_fail)
+ if res != 0:
+ if exp_fail:
+ return
+ self.fail("ERROR: Resize operation failed with return code %d" % res)
+ elif exp_fail:
+ self.fail("ERROR: Resize operation succeeded, but a failure was expected")
+
+ broker = self.broker(store_args(), name="broker")
+ self.check_messages(broker, queue_name, msgs, True)
+
+
+class SimpleTest(ResizeTest):
+ """
+ Simple tests of the resize utility for resizing a journal to larger and smaller sizes.
+ """
+
+ def test_empty_store_same(self):
+ self._resize_test(queue_name = "empty_store_same",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 8, resize_file_size = 24)
+
+ def test_empty_store_up(self):
+ self._resize_test(queue_name = "empty_store_up",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_empty_store_down(self):
+ self._resize_test(queue_name = "empty_store_down",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 6, resize_file_size = 12)
+
+# Put into long tests, make sure there is > 128GB free disk space
+# def test_empty_store_max(self):
+# self._resize_test(queue_name = "empty_store_max",
+# num_msgs = 0, msg_size = 0,
+# init_num_files = 8, init_file_size = 24,
+# resize_num_files = 64, resize_file_size = 32768,
+# wait_time = 120)
+
+ def test_empty_store_min(self):
+ self._resize_test(queue_name = "empty_store_min",
+ num_msgs = 0, msg_size = 0,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 1)
+
+ def test_basic_up(self):
+ self._resize_test(queue_name = "basic_up",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_basic_down(self):
+ self._resize_test(queue_name = "basic_down",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 15)
+
+ def test_basic_low(self):
+ self._resize_test(queue_name = "basic_low",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 4,
+ exp_fail = True)
+
+ def test_basic_under(self):
+ self._resize_test(queue_name = "basic_under",
+ num_msgs = 100, msg_size = 10000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 4, resize_file_size = 3,
+ exp_fail = True)
+
+ def test_very_large_msg_up(self):
+ self._resize_test(queue_name = "very_large_msg_up",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_very_large_msg_down(self):
+ self._resize_test(queue_name = "very_large_msg_down",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 16, init_file_size = 64,
+ resize_num_files = 16, resize_file_size = 48)
+
+ def test_very_large_msg_low(self):
+ self._resize_test(queue_name = "very_large_msg_low",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 7, resize_file_size = 20,
+ exp_fail = True)
+
+ def test_very_large_msg_under(self):
+ self._resize_test(queue_name = "very_large_msg_under",
+ num_msgs = 4, msg_size = 2000000,
+ init_num_files = 8, init_file_size = 24,
+ resize_num_files = 6, resize_file_size = 8,
+ exp_fail = True)
Modified: store/trunk/cpp/tests/python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/python_tests/store_test.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -27,10 +27,12 @@
from qmf.console import Session
-def store_args():
+def store_args(store_dir = None):
"""Return the broker args necessary to load the async store"""
assert BrokerTest.store_lib
- return ["--load-module", BrokerTest.store_lib]
+ if store_dir == None:
+ return ["--load-module", BrokerTest.store_lib]
+ return ["--load-module", BrokerTest.store_lib, "--store-dir", store_dir]
class Qmf:
"""
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tests/run_python_tests 2010-08-10 17:33:04 UTC (rev 4196)
@@ -46,7 +46,7 @@
xLONG_TEST)
DEFAULT_PYTHON_TESTS= ;;
x)
- DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1" ;;
+ DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.*" ;;
*)
DEFAULT_PYTHON_TESTS=$1
esac
Modified: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/janal.py 2010-08-10 17:33:04 UTC (rev 4196)
@@ -408,11 +408,11 @@
return self._txn_msg_cnt
def txn_obj_list(self):
- """Get a cululative list of transaction objects (commits and aborts)"""
+ """Get a cumulative list of transaction objects (commits and aborts)"""
return self._txn_obj_list
def _advance_jrnl_file(self, *oldest_file_info):
- """Rotate to using the next journal file. Return False if the operation was sucessful, True if there are no
+ """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
more files to read."""
fro_seek_flag = False
if len(oldest_file_info) > 0:
@@ -454,7 +454,7 @@
def _check_owi(self, hdr):
"""Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
- indicate wheher the last record in a file has been read and now older records which have not yet been
+ indicate whether the last record in a file has been read and now older records which have not yet been
overwritten are now being read."""
return self._file_hdr_owi == hdr.owi()
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2010-08-09 19:01:28 UTC (rev 4195)
+++ store/trunk/cpp/tools/resize 2010-08-10 17:33:04 UTC (rev 4196)
@@ -155,7 +155,7 @@
if self._file == None:
rid = hdr.rid
elif len(rid_list) == 0:
- rid = None
+ rid = 0
else:
rid = rid_list[0]
if not self._rotate_file(rid, fro):
14 years, 4 months
rhmessaging commits: r4195 - store/trunk/cpp/tools.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-08-09 15:01:28 -0400 (Mon, 09 Aug 2010)
New Revision: 4195
Modified:
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/resize
Log:
Minor change to tools which speeds up (somewhat) the resize and check_jrnl utilities
Modified: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py 2010-08-06 17:20:26 UTC (rev 4194)
+++ store/trunk/cpp/tools/janal.py 2010-08-09 19:01:28 UTC (rev 4195)
@@ -40,17 +40,17 @@
def add(self, fid, hdr, lock = False):
"""Add a new record into the map"""
- if hdr.rid in self.__map.keys():
+ if hdr.rid in self.__map:
raise jerr.DuplicateRidError(hdr.rid)
self.__map[hdr.rid] = (fid, hdr, lock)
def contains(self, rid):
"""Return True if the map contains the given rid"""
- return rid in self.__map.keys()
+ return rid in self.__map
def delete(self, rid):
"""Delete the rid and its associated data from the map"""
- if rid in self.__map.keys():
+ if rid in self.__map:
if self.get_lock(rid):
raise jerr.DeleteLockedRecordError(rid)
del self.__map[rid]
@@ -87,7 +87,7 @@
def lock(self, rid):
"""Set the transaction lock for a given rid to True"""
- if rid in self.__map.keys():
+ if rid in self.__map:
tup = self.__map[rid]
if not tup[2]:
self.__map[rid] = (tup[0], tup[1], True)
@@ -126,7 +126,7 @@
def unlock(self, rid):
"""Set the transaction lock for a given rid to False"""
- if rid in self.__map.keys():
+ if rid in self.__map:
tup = self.__map[rid]
if tup[2]:
self.__map[rid] = (tup[0], tup[1], False)
@@ -154,14 +154,14 @@
"""Add a new transactional record into the map"""
if isinstance(hdr, jrnl.DeqRec):
self.__emap.lock(hdr.deq_rid)
- if hdr.xid in self.__map.keys():
+ if hdr.xid in self.__map:
self.__map[hdr.xid].append((fid, hdr)) # append to existing list
else:
self.__map[hdr.xid] = [(fid, hdr)] # create new list
def contains(self, xid):
"""Return True if the xid exists in the map; False otherwise"""
- return xid in self.__map.keys()
+ return xid in self.__map
def delete(self, hdr):
"""Remove a transaction record from the map using either a commit or abort header"""
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2010-08-06 17:20:26 UTC (rev 4194)
+++ store/trunk/cpp/tools/resize 2010-08-09 19:01:28 UTC (rev 4195)
@@ -118,7 +118,7 @@
hdr = tup[1]
hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
master_record_list[long(hdr.rid)] = hdr
- if hdr.xidsize > 0 and hdr.xid in txn_record_list.keys():
+ if hdr.xidsize > 0 and hdr.xid in txn_record_list:
txn_hdr = txn_record_list[hdr.xid]
del(txn_record_list[hdr.xid])
txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
14 years, 4 months
rhmessaging commits: r4194 - mgmt/newdata/cumin/resources.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-08-06 13:20:26 -0400 (Fri, 06 Aug 2010)
New Revision: 4194
Modified:
mgmt/newdata/cumin/resources/slots.swf
Log:
Better version of slot vis. This one fixes the erroneous movement of the rectangles when a system is not expanded.
Modified: mgmt/newdata/cumin/resources/slots.swf
===================================================================
(Binary files differ)
14 years, 4 months
rhmessaging commits: r4193 - in store/trunk/java/bdbstore: test-profiles and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-08-06 10:31:04 -0400 (Fri, 06 Aug 2010)
New Revision: 4193
Added:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
Modified:
store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.excludes
store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile
store/trunk/java/bdbstore/test-profiles/java-bdb.excludes
store/trunk/java/bdbstore/test-profiles/java-bdb.testprofile
Log:
Add new BDBMessageStoreTest with BDB specific tests. Where possible, tests were instead added to the superclass MessageStoreTest
Added: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (rev 0)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java 2010-08-06 14:31:04 UTC (rev 4193)
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ */
+public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+ /**
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * verify their ability to co-exist within the store and be successful retrieved.
+ */
+ public void testBDBMessagePersistence() throws Exception
+ {
+ MessageStore store = _virtualHost.getMessageStore();
+
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ // Create content ByteBuffers.
+ // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+ // Use a single chunk for the 0-10 message as per broker behaviour.
+ String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+
+ ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+
+ ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+ int bodySize = completeContentBody_0_10.limit();
+
+ /*
+ * Create and insert a 0-8 message (metadata and multi-chunk content)
+ */
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+ long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+ storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+ storedMessage_0_8.flushToStore();
+
+ /*
+ * Create and insert a 0-10 message (metadata and content)
+ */
+ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+ DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+ Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+ MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+ StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+ long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+ storedMessage_0_10.addContent(0, completeContentBody_0_10);
+ storedMessage_0_10.flushToStore();
+
+ /*
+ * reload the store only (read-only)
+ */
+ bdbStore = reloadStoreReadOnly(bdbStore);
+
+ /*
+ * Read back and validate the 0-8 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+ MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+ MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+ assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+ assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+ assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+ assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+ ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+ assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
+ assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
+ assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
+
+ BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.properties;
+ assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+ assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+ ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
+ long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
+ String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+ /*
+ * Read back and validate the 0-10 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+ MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+ assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+ assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+ assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+ assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+ assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ assertNotNull("MessageProperties were not returned", returnedMsgProps);
+ assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+ assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+ assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+ ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+ long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
+ assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+ String returnedPayloadString_0_10 = new String(recoveredContent.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+ }
+
+ private DeliveryProperties createDeliveryProperties_0_10()
+ {
+ DeliveryProperties delProps_0_10 = new DeliveryProperties();
+
+ delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ delProps_0_10.setImmediate(true);
+ delProps_0_10.setExchange("exchange12345");
+ delProps_0_10.setRoutingKey("routingKey12345");
+ delProps_0_10.setExpiration(5);
+ delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+
+ return delProps_0_10;
+ }
+
+ private MessageProperties createMessageProperties_0_10(int bodySize)
+ {
+ MessageProperties msgProps_0_10 = new MessageProperties();
+ msgProps_0_10.setContentLength(bodySize);
+ msgProps_0_10.setCorrelationId("qwerty".getBytes());
+ msgProps_0_10.setContentType("text/html");
+
+ return msgProps_0_10;
+ }
+
+ /**
+ * Close the provided store and create a new (read-only) store to read back the data.
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
+ * to avoid the recovery handler deleting the message for not being on a queue.
+ */
+ private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
+ {
+ messageStore.close();
+ File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
+
+ BDBMessageStore newStore = new BDBMessageStore();
+ newStore.configure(storePath, false);
+ newStore.start();
+
+ return newStore;
+ }
+
+ private MessagePublishInfo createPublishInfoBody_0_8()
+ {
+ return new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("exchange12345");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("routingKey12345");
+ }
+ };
+
+ }
+
+ private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ return new ContentHeaderBody(classForBasic, 1, props, length);
+ }
+
+ private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/html");
+ props.getHeaders().setString("Test", "MST");
+ return props;
+ }
+
+ /**
+ * Tests that messages which are added to the store and then removed using the
+ * public MessageStore interfaces are actually removed from the store by then
+ * interrogating the store with its own implementation methods and verifying
+ * expected exceptions are thrown to indicate the message is not present.
+ */
+ public void testMessageCreationAndRemoval() throws Exception
+ {
+ MessageStore store = _virtualHost.getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ //remove the message in the fashion the broker normally would
+ storedMessage_0_8.remove();
+
+ //verify the removal using the BDB store implementation methods directly
+ try
+ {
+ // the next line should throw since the message id should not be found
+ bdbStore.getMessageMetaData(messageid_0_8);
+ fail("No exception thrown when message id not found getting metadata");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass since exception expected
+ }
+
+ //expecting no content, allocate a 1 byte
+ ByteBuffer dst = ByteBuffer.allocate(1);
+
+ assertEquals("Retrieved content when none was expected",
+ 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ }
+
+ private BDBMessageStore assertBDBStore(Object store)
+ {
+ if(!(store instanceof BDBMessageStore))
+ {
+ fail("Test requires an instance of BDBMessageStore to proceed");
+ }
+
+ return (BDBMessageStore) store;
+ }
+
+ private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ {
+ byte[] body10Bytes = "0123456789".getBytes();
+ byte[] body5Bytes = "01234".getBytes();
+
+ ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
+ ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+
+ int bodySize = body10Bytes.length + body5Bytes.length;
+
+ //create and store the message using the MessageStore interface
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+ storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1.limit(), chunk2);
+ storedMessage_0_8.flushToStore();
+
+ return storedMessage_0_8;
+ }
+
+ /**
+ * Tests transaction commit by utilising the enqueue and dequeue methods available
+ * in the TransactionLog interface implemented by the store, and verifying the
+ * behaviour using BDB implementation methods.
+ */
+ public void testTranCommit() throws Exception
+ {
+ TransactionLog log = _virtualHost.getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 1L);
+ txn.enqueueMessage(mockQueue, 5L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 1L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 5L, val.longValue());
+ }
+
+
+ /**
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackBeforeCommit() throws Exception
+ {
+ TransactionLog log = _virtualHost.getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 21L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 22L);
+ txn.enqueueMessage(mockQueue, 23L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 22L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 23L, val.longValue());
+ }
+
+ /**
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackAfterCommit() throws Exception
+ {
+ TransactionLog log = _virtualHost.getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 30L);
+ txn.commitTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 31L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 32L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
+
+}
\ No newline at end of file
Modified: store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.excludes
===================================================================
--- store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.excludes 2010-08-06 14:30:04 UTC (rev 4192)
+++ store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.excludes 2010-08-06 14:31:04 UTC (rev 4193)
@@ -1,6 +1,7 @@
#These tests are broken
-org.apache.qpid.server.store.berkeleydb.BDBStoreTest#*
-org.apache.qpid.server.store.berkeleydb.MessageReSendTest#*
org.apache.qpid.server.store.berkeleydb.QueueDeleteWhilstRoutingTest#*
-org.apache.qpid.server.store.berkeleydb.StoreContextRaceConditionTest#*
+//This test is subclassed within the bdbstore module to enable it to run and
+//also add some bdb-specific tests. It is excluded to prevent running twice.
+org.apache.qpid.server.store.MessageStoreTest#*
+
Modified: store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile
===================================================================
--- store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile 2010-08-06 14:30:04 UTC (rev 4192)
+++ store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile 2010-08-06 14:31:04 UTC (rev 4193)
@@ -5,6 +5,7 @@
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=${project.root}/build/etc/config-systests-bdb.xml
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaStandaloneExcludes JavaPersistentExcludes Java010Excludes 08StandaloneExcludes
broker.clean.between.tests=true
broker.persistent=true
Modified: store/trunk/java/bdbstore/test-profiles/java-bdb.excludes
===================================================================
--- store/trunk/java/bdbstore/test-profiles/java-bdb.excludes 2010-08-06 14:30:04 UTC (rev 4192)
+++ store/trunk/java/bdbstore/test-profiles/java-bdb.excludes 2010-08-06 14:31:04 UTC (rev 4193)
@@ -1,6 +1,6 @@
-#These tests are broken
-org.apache.qpid.server.store.berkeleydb.BDBStoreTest#*
-org.apache.qpid.server.store.berkeleydb.MessageReSendTest#*
+//These tests are broken
org.apache.qpid.server.store.berkeleydb.QueueDeleteWhilstRoutingTest#*
-org.apache.qpid.server.store.berkeleydb.StoreContextRaceConditionTest#*
+//This test is subclassed within the bdbstore module to enable it to run and
+//also add some bdb-specific tests. It is excluded to prevent running twice.
+org.apache.qpid.server.store.MessageStoreTest#*
Modified: store/trunk/java/bdbstore/test-profiles/java-bdb.testprofile
===================================================================
--- store/trunk/java/bdbstore/test-profiles/java-bdb.testprofile 2010-08-06 14:30:04 UTC (rev 4192)
+++ store/trunk/java/bdbstore/test-profiles/java-bdb.testprofile 2010-08-06 14:31:04 UTC (rev 4193)
@@ -4,6 +4,7 @@
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=${project.root}/build/etc/config-systests-bdb.xml
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaStandaloneExcludes JavaPersistentExcludes 08StandaloneExcludes
broker.clean.between.tests=true
broker.persistent=true
14 years, 4 months
rhmessaging commits: r4192 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-08-06 10:30:04 -0400 (Fri, 06 Aug 2010)
New Revision: 4192
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Correct log output during database shutdown process
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06 14:29:09 UTC (rev 4191)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06 14:30:04 UTC (rev 4192)
@@ -501,7 +501,7 @@
if (_queueBindingsDb != null)
{
- _log.info("Closing exchange database");
+ _log.info("Closing bindings database");
_queueBindingsDb.close();
}
@@ -1538,6 +1538,7 @@
cursor = _messageContentDb.openCursor(null, null);
OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+
while (status == OperationStatus.SUCCESS)
{
mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
14 years, 4 months
rhmessaging commits: r4191 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-08-06 10:29:09 -0400 (Fri, 06 Aug 2010)
New Revision: 4191
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Remove use of StoreContext to wrap the BDB transaction
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06 14:28:17 UTC (rev 4190)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06 14:29:09 UTC (rev 4191)
@@ -49,7 +49,6 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
@@ -780,8 +779,7 @@
*/
public void removeMessage(Long messageId) throws AMQStoreException
{
- // _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
- // + "): called");
+ // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
com.sleepycat.je.Transaction tx = null;
@@ -1162,21 +1160,19 @@
}
/**
- * Places a message onto a specified queue, in a given transactional context.
+ * Places a message onto a specified queue, in a given transaction.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- // _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
- // + ", Long messageId): called");
+ // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
AMQShortString name = new AMQShortString(queue.getResourceName());
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
@@ -1202,20 +1198,18 @@
}
/**
- * Extracts a message from a specified queue, in a given transactional context.
+ * Extracts a message from a specified queue, in a given transaction.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
@@ -1257,16 +1251,14 @@
}
/**
- * Commits all operations performed within a given transactional context.
+ * Commits all operations performed within a given transaction.
*
- * @param context The transactional context to commit all operations for.
+ * @param tx The transaction to commit all operations for.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws AMQStoreException
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
{
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
//if (_log.isDebugEnabled())
//{
// _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
@@ -1274,7 +1266,7 @@
if (tx == null)
{
- throw new AMQStoreException("Fatal internal error: transactional context is empty at commitTran");
+ throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
}
StoreFuture result;
@@ -1291,25 +1283,19 @@
{
throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
- finally
- {
- context.setPayload(null);
- }
return result;
}
/**
- * Abandons all operations performed within a given transactional context.
+ * Abandons all operations performed within a given transaction.
*
- * @param context The transactional context to abandon.
+ * @param tx The transaction to abandon.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void abortTran(StoreContext context) throws AMQStoreException
+ public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
{
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
if (_log.isDebugEnabled())
{
_log.debug("abortTran called for [Transaction:" + tx + "]");
@@ -1323,10 +1309,6 @@
{
throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
}
- finally
- {
- context.setPayload(null);
- }
}
/**
@@ -1403,19 +1385,16 @@
/**
* Stores a chunk of message data.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(StoreContext context, Long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset,
ByteBuffer contentBody) throws AMQStoreException
{
-
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
TupleBinding keyBinding = new MessageContentKeyTB_3();
keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
@@ -1445,23 +1424,21 @@
/**
* Stores message meta-data.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param messageMetaData The message meta data to store.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void storeMetaData(StoreContext context, Long messageId, StorableMessageMetaData messageMetaData)
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
{
- _log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+ _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
@@ -1975,7 +1952,6 @@
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private StoreContext _ctx;
private com.sleepycat.je.Transaction _txn;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
@@ -1994,10 +1970,8 @@
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
if(persist)
{
- _ctx = new StoreContext();
_txn = _environment.beginTransaction(null, null);
- _ctx.setPayload(_txn);
- storeMetaData(_ctx, messageId, metaData);
+ storeMetaData(_txn, messageId, metaData);
}
}
catch (DatabaseException e)
@@ -2039,7 +2013,7 @@
{
try
{
- BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+ BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
}
catch (AMQStoreException e)
{
@@ -2063,13 +2037,13 @@
{
try
{
- if(_ctx != null)
+ if(_txn != null)
{
//if(_log.isDebugEnabled())
//{
// _log.debug("Flushing message " + _messageId + " to store");
//}
- BDBMessageStore.this.commitTranImpl(_ctx, true);
+ BDBMessageStore.this.commitTranImpl(_txn, true);
}
}
catch (AMQStoreException e)
@@ -2079,7 +2053,6 @@
finally
{
_txn = null;
- _ctx = null;
}
return IMMEDIATE_FUTURE;
}
@@ -2101,11 +2074,9 @@
private class BDBTransaction implements Transaction
{
private com.sleepycat.je.Transaction _txn;
- private StoreContext _ctx;
private BDBTransaction()
{
- _ctx = new StoreContext();
try
{
_txn = _environment.beginTransaction(null, null);
@@ -2114,33 +2085,32 @@
{
throw new RuntimeException(e);
}
- _ctx.setPayload(_txn);
}
public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+ BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+ BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
}
public void commitTran() throws AMQStoreException
{
- BDBMessageStore.this.commitTranImpl(_ctx, true);
+ BDBMessageStore.this.commitTranImpl(_txn, true);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return BDBMessageStore.this.commitTranImpl(_ctx, false);
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
}
public void abortTran() throws AMQStoreException
{
- BDBMessageStore.this.abortTran(_ctx);
+ BDBMessageStore.this.abortTran(_txn);
}
}
14 years, 4 months
rhmessaging commits: r4190 - in store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb: utils and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-08-06 10:28:17 -0400 (Fri, 06 Aug 2010)
New Revision: 4190
Removed:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
Log:
Remove old broken tests which are being replaced
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,545 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-/*
-import com.sleepycat.je.DatabaseException;
-import junit.framework.Assert;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-*/
-
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class BDBStoreTest extends BDBVMTestCase
-{
- /*
- private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
-
- private BDBMessageStore _store;
- private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
-
- private StoreContext _storeContext = new StoreContext();
- private VirtualHost _virtualHost;
-
- private TransactionalContext _txnContext;
- private static final AMQShortString QUEUE1 = new AMQShortString("queue1");
- private static final AMQShortString ME = new AMQShortString("me");
- private static final AMQShortString MYEXCHANGE = new AMQShortString("myexchange");
- private static final AMQShortString RK = new AMQShortString("rk");
- private static final AMQShortString QUEUE2 = new AMQShortString("queue2");
- private static final AMQShortString HIM = new AMQShortString("him");
- private static final AMQShortString EXCHANGE1 = new AMQShortString("exchange1");
-
- private static volatile int _loops;
- File BDB_DIR = new File(STORE_LOCATION);
-
- public void setUp() throws Exception
- {
- if (BDB_DIR.exists())
- {
- deleteDirectory(BDB_DIR);
- }
-
- ApplicationRegistry.initialise(new NullApplicationRegistry());
-
- File bdbDir = new File(STORE_LOCATION);
- deleteDirectory(bdbDir);
- BDB_DIR.mkdirs();
-
- _store = new BDBMessageStore();
- _store.configure(BDB_DIR);
-
- PropertiesConfiguration config = new PropertiesConfiguration();
-
- // This is used to test that recovery will correctly reapply configuration to the queues
- // in testRecovery
- config.setProperty("queues.maximumMessageAge", "23");
-
- VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", config);
-
- _virtualHost = new VirtualHost(vhostConfig, _store);
-
- _store.setVirtualHost(_virtualHost);
-
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
- }
-
- private void reload() throws Exception
- {
- _virtualHost.close();
-
- PropertiesConfiguration env = new PropertiesConfiguration();
-
- env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env), null);
-
- _store = (BDBMessageStore) _virtualHost.getMessageStore();
- env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
- env.setProperty("queues.maximumMessageAge", 23);
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
- }
-
- public void tearDown() throws Exception
- {
- _virtualHost.close();
-
- ApplicationRegistry.remove(1);
- }
-
- public void testExchangePersistence() throws Exception
- {
- FieldTable queueArguments = new FieldTable();
- Integer priorityLevel = 5;
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
-
- Exchange exchange = new DefaultExchangeFactory(_virtualHost).createExchange(EXCHANGE1, DirectExchange.TYPE.getName(), true, false, 0);
-
- assertNotNull("Exchange is null", exchange);
- assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
- assertTrue("Exchange is not durable", exchange.isDurable());
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- _store.createExchange(exchange);
-
- //Ensure it is registered correctly
- exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
- assertNotNull("Exchange is null", exchange);
-
- reload();
-
- exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
-
- assertNotNull("Exchange is null", exchange);
- assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
- assertTrue("Exchange is not durable", exchange.isDurable());
-
- }
-
- public void testQueuePersistence() throws Exception
- {
-
- FieldTable queueArguments = new FieldTable();
- Integer priorityLevel = 5;
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
-
- _store.createQueue(queue, queueArguments);
-
- AMQShortString routingKey = new AMQShortString("Test-Key");
- FieldTable bindArguments = new FieldTable();
- bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
-
- _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
-
- reload();
-
- AMQQueue returnedQueue = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
-
- assertEquals("Queue Name has changed", QUEUE1, returnedQueue.getName());
- assertEquals("Queue Owner has changed", ME, returnedQueue.getOwner());
- assertTrue("Returned Queue is not Durable", returnedQueue.isDurable());
- assertEquals("Returned Queue is not A Priority Queue", AMQPriorityQueue.class, returnedQueue.getClass());
- assertEquals("Returned Queue does not have the right number of priorities", priorityLevel.intValue(),
- ((AMQPriorityQueue) returnedQueue).getPriorities());
- assertNotNull("Queue has no exchange binding arguments.", returnedQueue.getExchangeBindings());
- assertEquals("Incorrect binding count for queue.", 1, returnedQueue.getExchangeBindings().size());
- assertTrue("Binding does not contain a Selector argument.",
- returnedQueue.getExchangeBindings().get(0).getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
- }
-
- private MessagePublishInfo createPublishBody()
- {
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return MYEXCHANGE;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return true;
- }
-
- public AMQShortString getRoutingKey()
- {
- return RK;
- }
- };
-
- }
-
- private BasicContentHeaderProperties createContentHeaderProperties()
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setContentType("text/html");
- props.setMessageId("abc123");
- return props;
- }
-
- private ContentChunk createContentChunk(String bodyText)
- {
- byte[] bodyBytes = bodyText.getBytes();
- final int size = bodyBytes.length;
- final ByteBuffer payload = ByteBuffer.wrap(bodyBytes);
-
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return payload;
- }
-
- public void reduceToFit()
- {
- }
- };
-
- }
-
- private ContentHeaderBody createContentHeaderBody(BasicContentHeaderProperties props, int length)
- {
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- return new ContentHeaderBody(classForBasic, 1, props, length);
- }
-
- public void testMessagePersistence() throws DatabaseException, AMQException
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
-
- MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
- MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
- Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
- Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
- Assert.assertEquals("Mandatory flag has changed", pubBody.isMandatory(), returnedPubBody.isMandatory());
- Assert.assertEquals("Routing key has changed", pubBody.getRoutingKey(), returnedPubBody.getRoutingKey());
-
- ContentHeaderBody returnedHeaderBody = mmd.getContentHeaderBody();
- Assert.assertEquals("ContentHeader ClassID has changed", chb.classId, returnedHeaderBody.classId);
- Assert.assertEquals("ContentHeader weight has changed", chb.weight, returnedHeaderBody.weight);
- Assert.assertEquals("ContentHeader bodySize has changed", chb.bodySize, returnedHeaderBody.bodySize);
- BasicContentHeaderProperties returnedProperties = (BasicContentHeaderProperties) returnedHeaderBody.properties;
- Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
- Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
- Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L, 0);
- ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
- byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
- returnedPayloadAsBytes.get(returnedPayload);
- String returnedPayloadString = new String(returnedPayload);
- Assert.assertEquals("Message Payload has changed", bodyText, returnedPayloadString);
- }
-
- public void testMessageCreateAndDelete() throws Exception
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- _store.removeMessage(_storeContext, 15L);
-
- // the next line should throw since the message id should not be found
- try
- {
- _store.getMessageMetaData(_storeContext, 15L);
- Assert.fail("No exception thrown when message id not found getting metadata");
- }
- catch (AMQException e)
- {
- // pass since exception expected
- }
-
- try
- {
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- Assert.fail("No exception thrown when message id not found getting content chunk");
- }
- catch (AMQException e)
- {
- // pass since exception expected
- }
-
- }
-
- public void testTranCommit() throws Exception
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 20L);
- _store.enqueueMessage(_storeContext, queue, 21L);
- _store.commitTran(_storeContext);
-
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
- val = enqueuedIds.get(1);
- Assert.assertEquals("Second Message is incorrect", 21L, val.longValue());
-
- }
-
- public void testTranRollback1() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.commitTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.abortTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.commitTran(_storeContext);
-
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
- assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 30L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 31L, val.longValue());
-
- }
-
- public void testTranRollback2() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.abortTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.commitTran(_storeContext);
-
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
- val = enqueuedIds.get(1);
- Assert.assertEquals("Second Message is incorrect", 32L, val.longValue());
- }
-
- public void testRecovery() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
-
- _store.createQueue(queue);
- _store.createQueue(queue2);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 40L);
- _store.enqueueMessage(_storeContext, queue, 41L);
- _store.enqueueMessage(_storeContext, queue2, 42L);
- _store.commitTran(_storeContext);
-
- _store.enqueueMessage(_storeContext, queue, 42L);
-
- reload();
-
- try
- {
- AMQQueue q1 = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
- AMQQueue q2 = _virtualHost.getQueueRegistry().getQueue(QUEUE2);
-
- Assert.assertNotNull("Queue1 is was not recovered", q1);
- Assert.assertEquals("Queue1 has incorrect message count", 3, q1.getMessageCount());
- Assert.assertNotNull("Queue2 is was not recovered", q2);
- Assert.assertEquals("Queue2 has incorrect message count", 1, q2.getMessageCount());
-
- // Message age is set in setUp
- assertEquals("q1 has an incorrect maximum message age", 23, q1.getMaximumMessageAge());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-
- public void testDequeue() throws AMQException
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
-
- _store.enqueueMessage(_storeContext, queue, 50L);
- _store.dequeueMessage(_storeContext, queue, 50L);
- }
-
- public void testQueueRemove() throws AMQException
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
- _store.removeQueue(queue);
- try
- {
- _store.removeQueue(queue);
- Assert.fail("No exception thrown when deleting non-existant queue");
- }
- catch (AMQException e)
- {
- // Pass
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new TestSuite(BDBStoreTest.class);
- }
- */
-
- public void testDummy()
- {
-
- }
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store.berkeleydb;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class MessageReSendTest extends BDBVMTestCase
-{
- protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
-
- public void test() throws Exception
- {
-
- //Send Message
- sendMessages(getConnection(), 1);
- System.err.println("SEND");
-
- //Create Connection
- Connection connection = getConnection();
- System.err.println("RECEIVE");
-
- //Receive Message
- checkMessagesOnQueue(connection, _queue, 1);
- //Close connections
- connection.close();
- System.err.println("VALIDATE");
-
- //Reconnect and ensure message is gone
- connection = getConnection();
- checkMessagesOnQueue(connection, _queue, 0);
- connection.close();
-
- try
- {
- //restart broker
- stopBroker(1);
- System.err.println("START");
- startBroker(1);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- //reconnect and ensure message is gone
- connection = getConnection();
- checkMessagesOnQueue(connection, _queue, 0);
- connection.close();
- }
-
- private void checkMessagesOnQueue(Connection connection, Queue queue, int count)
- {
- try
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
-
- connection.start();
-
- Message msg = consumer.receive(1000);
-
- if (count > 0)
- {
- int received = 1;
- while (received < count)
- {
- assertNotNull(msg);
- assertEquals(received, msg.getIntProperty(MESSAGE_ID_PROPERTY));
-
- //get next message
- msg = consumer.receive(1000);
- }
-
- }
- else
- {
- assertNull("Received Message when none expected", msg);
- }
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
- }
-
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-public class MessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
-{
-
- public void testBDBMessageStore() throws Exception
- {
- PropertiesConfiguration config = new PropertiesConfiguration();
-
- config.addProperty("store.environment-path",
- System.getProperty("QPID_WORK") + "/BDB_MessageStoreTest");
- config.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
- runTestWithStore(config);
- }
-
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,204 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.NamingException;
-import java.io.File;
-
-public class QueueDeleteWhilstRoutingTest extends BDBVMTestCase
-{
- private static final Logger _logger = Logger.getLogger(QueueDeleteWhilstRoutingTest.class);
-
- MessageConsumer _consumer1, _consumer2;
- Session _clientSession1;
- Connection _producerConnection, _clientConnection1;
-
- int brokerID = 2;
-
- /**
- * Issue analysis:
- * When an Exclusive NonDurable queue is created a queueDelete task is added to the sessionCloseTaskList
- * When the last consumer on an autodelete queue closes queueDelete is called.
- *
- * Hence the queue is delted twice. Which would hurt the ref counting of all messages in the consumers
- * unacked map
- *
- * Test Plan:
- *
- * Create two subscribers same topic
- *
- * Send two messages
- *
- * consume one from each consumer to validate that all is good
- *
- * Shutdown persistent broker
- *
- * restart.
- *
- * Expecting failure in broker startup.
- * @throws Exception
- */
- public void test() throws Exception
- {
- _logger.debug("Performing receives");
-
- Message msg1 = _consumer1.receive(1000);
-
- assertNotNull(msg1);
-
- //Check message recevied ok
- assertEquals("Message 1 not received on consumer 1", "Message: 1", ((TextMessage) msg1).getText());
-
- _consumer1.close();
-
- _clientConnection1.close();
-
- _producerConnection.close();
-
- try
- {
- _logger.debug("Shutdown broker in 1 second");
- Thread.sleep(4000);
- }
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
-
- //Stop the broker
- stopBroker(brokerID);
-
- try
- {
- _logger.debug("Restart broker in 2 second");
- Thread.sleep(4000);
- }
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
-
- //Start the broker
- try
- {
- //FIXME startVMBroker(brokerID, _persistentConfigFile);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- //Test Connection
- _clientConnection1 = getConnection();
-
- _clientConnection1.close();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- //FIXME startVMBroker(brokerID, _persistentConfigFile);
-
- // Initialise ACLs.
-
- //Create Consumers
- //Create consumer on the temp queue
- Queue requestQueue = (Queue) getInitialContext().lookup("queue");
-
- _clientConnection1 = getConnection();
- _clientSession1 = _clientConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _queue = _clientSession1.createTemporaryQueue();
-
- _consumer1 = _clientSession1.createConsumer(_queue);
-
- //Start the connection
- _clientConnection1.start();
-
- //Create Producer
- _producerConnection = getConnection();
- final Session producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- //Create a listener for the messages
- producerSession.createConsumer(requestQueue).setMessageListener(new MessageListener()
- {
- public void onMessage(final Message message)
- {
- try
- {
- Destination responseQueue = message.getJMSReplyTo();
-
- //Send a response to the message
- producerSession.createProducer(responseQueue)
- .send(producerSession.createTextMessage(((TextMessage) message).getText()));
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
- }
- });
- //Start the connection
- _producerConnection.start();
-
- //Send two messages
-
- MessageProducer _clientProducer = _clientSession1.createProducer(requestQueue);
- Message msg = _clientSession1.createTextMessage("Message: 1");
- msg.setJMSReplyTo(_queue);
- _clientProducer.send(msg);
-
- msg = _clientSession1.createTextMessage("Message: 2");
- msg.setJMSReplyTo(_queue);
- _clientProducer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- //Stop the broker
- try
- {
- stopBroker(brokerID);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- super.tearDown();
- }
-
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store.berkeleydb;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.naming.NamingException;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class StoreContextRaceConditionTest extends BDBVMTestCase
-{
- private static final Logger _logger = Logger.getLogger(StoreContextRaceConditionTest.class);
-
- public void test() throws InterruptedException, NamingException, JMSException
- {
- Runnable test = new Runnable()
- {
- public void run()
- {
-
- //Create Consumer
- Connection connection = null;
-
- Session session = null;
- try
- {
- try
- {
- connection = getConnection();
- }
- catch (Exception e)
- {
- fail("Unable to obtain connection.");
- }
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- catch (JMSException e)
- {
- return;
- }
-
- try
- {
- int run = 0;
- while (run < 1)
- {
- try
- {
- //Stop the connection to prevent flow
- connection.stop();
- //Create Consumer to receive msgs
- MessageConsumer consumer = session.createConsumer(_queue);
-
- //Send one message to hold up the Async Delivery from purging
- _logger.info("***** CREATED Consumer");
- _TimeToLive = 0L;
- sendMessages(1);
- _logger.info("***** SENT msg 1");
- //Send 1000 msgs that will time out
- _TimeToLive = 1000L;
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- //Timeout Messages - Note that we
- Thread.sleep(1000);
- _logger.info("***** SLEEP");
-
- //Allw the messages to flow to us
- connection.start();
- _logger.info("***** START Consumer");
- //*** Starts Async process
-
- //Remove the first message so that the async will occcur and start purging.
- consumer.receive(1000);
- _logger.info("***** RECEIVE Consumer");
-
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- //Close the consumer freeing the QHK thread to doing work
- consumer.close();
- _logger.info("***** CLOSE Consumer");
- //** Allows QueueHouskeeping to run.
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- run++;
- }
- catch (JMSException e)
- {
-
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- try
- {
- connection.close();
- }
- catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- _logger.info("***** Test Done");
- }
- }
- };
-
- int MAX_THREADS = 1;
-
- Thread[] threads = new Thread[MAX_THREADS];
-
- for (int concurentClients = 0; concurentClients < MAX_THREADS; concurentClients++)
- {
- threads[concurentClients] = new Thread(test);
- threads[concurentClients].start();
- }
-
- for (int concurentClients = 0; concurentClients < MAX_THREADS; concurentClients++)
- {
- threads[concurentClients].join();
- }
- }
-
- public static void main(String[] args) throws Exception, InterruptedException
- {
- StoreContextRaceConditionTest scrc = new StoreContextRaceConditionTest();
-
- scrc.setUp();
- scrc.test();
-// scrc.tearDown();
- }
-
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import java.io.File;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Level;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class BDBVMTestCase extends QpidBrokerTestCase
-{
- public static final String BDB_WORK = "BDB_WORK";
- public static final String QPID_WORK = "QPID_WORK";
-
- protected String testWork = null;
-
- protected String BDB_WORK_PRE_TEST;
- protected String QPID_WORK_PRE_TEST;
-
- protected final String QpidHome = System.getProperty("QPID_HOME");
- protected final File _persistentConfigFile = new File(QpidHome, "etc/persistent_config.xml");
- protected Queue _queue;
-
- protected long _TimeToLive = 0L;
- public static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
-
- public void setUp() throws Exception
- {
- setupWorkDirectory();
-
- //Create the Broker
- super.setUp();
-
- _queue = new AMQQueue("amq.direct", "BDBTestQ");
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
-
- if (testWork != null)
- {
- // Clean up the BDB store
- deleteDirectory(new File(testWork));
- testWork = null;
- }
-
- //Reset BDB_WORK
- if (BDB_WORK_PRE_TEST == null)
- {
- System.clearProperty(BDB_WORK);
- }
- else
- {
- System.setProperty(BDB_WORK, BDB_WORK_PRE_TEST);
- }
-
- //Reset QPID_WORK
- if (QPID_WORK_PRE_TEST == null)
- {
- System.clearProperty(QPID_WORK);
- }
- else
- {
- System.setProperty(QPID_WORK, QPID_WORK_PRE_TEST);
- }
- }
-
- public void setupWorkDirectory()
- {
- if (System.getProperty(BDB_WORK) == null)
- {
- fail("BDB_WORK required for BDB tests");
- }
-
- BDB_WORK_PRE_TEST = System.getProperty(BDB_WORK);
- QPID_WORK_PRE_TEST = System.getProperty(QPID_WORK);
-
- //IF BDB_WORK is set but not QPID_WORK then set QPID_WORK to BDB_WORK
- if (QPID_WORK_PRE_TEST == null && BDB_WORK_PRE_TEST != null)
- {
- System.setProperty(QPID_WORK, BDB_WORK_PRE_TEST);
- }
- }
-
- public boolean deleteDirectory(File dir)
- {
- if (dir.isDirectory())
- {
- String[] children = dir.list();
- for (int i = 0; i < children.length; i++)
- {
- if (!deleteDirectory(new File(dir, children[i])))
- {
- return false;
- }
- }
- }
-
- return (dir.delete());
- }
-
- protected void sendMessages(int num) throws JMSException
- {
- Connection producerConnection = null;
- try
- {
- producerConnection = getConnection();
- }
- catch (Exception e)
- {
- fail("Unable to lookup connection in JNDI.");
- }
-
- sendMessages(producerConnection, num);
- }
-
- protected void sendMessages(Connection producerConnection, int num) throws JMSException
- {
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- //Ensure _queue is created
- producerSession.createConsumer(_queue).close();
-
- MessageProducer producer = producerSession.createProducer(_queue);
-
- producer.setTimeToLive(_TimeToLive);
- producer.setDisableMessageTimestamp(false);
-
- for (int messsageID = 0; messsageID < num; messsageID++)
- {
- TextMessage textMsg = producerSession.createTextMessage("Message " + messsageID);
- textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
- producer.send(textMsg);
- }
-
- producerConnection.close();
- }
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import java.util.List;
-import java.util.LinkedList;
-
-public class DurableSubscriber implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriber.class);
-
- JNDIHelper _jndiHelper;
-
- Session _session;
- TopicSubscriber _subscriber;
- AMQConnection _connection;
- private List<Message> _received;
-
- public static void main(String[] args) throws JMSException
- {
- new Publisher();
- }
-
- public DurableSubscriber() throws JMSException
- {
- this(JNDIHelper.DEFAULT_BROKER, null);
- }
-
- public DurableSubscriber(String broker, String topic) throws JMSException
- {
- this(broker, topic, null);
- }
-
- public DurableSubscriber(String broker, String topicStr, String selector) throws JMSException
- {
- _jndiHelper = new JNDIHelper(broker );
-
- _connection = (AMQConnection) ((ConnectionFactory) _jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
-
- _jndiHelper.close();
-
- AMQTopic topic = new AMQTopic(_connection, topicStr);
-
- _logger.debug("Create Session");
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _logger.debug("Create Durable Subscriber on Session");
-
- if (selector != null)
- {
- _subscriber = _session.createDurableSubscriber(topic, "MySubscription", selector, false);
- }
- else
- {
- _subscriber = _session.createDurableSubscriber(topic, "MySubscription");
- }
-
- _received = new LinkedList<Message>();
-
- _subscriber.setMessageListener(this);
-
- _connection.start();
- }
-
- public void close() throws JMSException
- {
- _connection.close();
- }
-
- public void onMessage(Message message)
- {
- _received.add(message);
- }
-
- public List<Message> getMessages()
- {
- return _received;
- }
-
- public void commit() throws JMSException
- {
- _session.commit();
- }
-}
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import javax.naming.NamingException;
-import javax.naming.InitialContext;
-import javax.naming.Context;
-import java.util.Properties;
-
-public class JNDIHelper
-{
- public static final String DEFAULT_BROKER = "tcp://localhost:2345";
- public static final String CONNECTION_JNDI_NAME = "local";
-
- public final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- public final String CONNECTION_NAME;
-
- InitialContext _ctx;
-
- public JNDIHelper(String broker)
- {
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
- setupJNDI();
- }
-
- /**
- * Lookup the specified name in the JNDI Context.
- *
- * @param name The string name of the object to lookup
- *
- * @return The object or null if nothing exists for specified name
- */
- public Object lookupJNDI(String name)
- {
- try
- {
- return _ctx.lookup(name);
- }
- catch (NamingException e)
- {
- System.err.println("Error looking up '" + name + "' in JNDI Context:" + e);
- }
-
- return null;
- }
-
- /**
- * Setup the JNDI context.
- *
- * In this case we are simply using a Properties object to store the pairing information.
- *
- * Further details can be found on the wiki site here:
- *
- * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html
- */
- private void setupJNDI()
- {
- // Set the properties ...
- Properties properties = new Properties();
- properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
- properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME);
-
- // Create the initial context
- Context ctx = null;
- try
- {
- _ctx = new InitialContext(properties);
- }
- catch (NamingException e)
- {
- System.err.println("Error Setting up JNDI Context:" + e);
- }
- }
-
- /** Close the JNDI Context to keep everything happy. */
- public void close()
- {
- try
- {
- _ctx.close();
- }
- catch (NamingException e)
- {
- System.err.println("Unable to close JNDI Context : " + e);
- }
- }
-}
-
-
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java 2010-08-06 14:10:10 UTC (rev 4189)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java 2010-08-06 14:28:17 UTC (rev 4190)
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.util.Properties;
-import java.util.List;
-import java.util.LinkedList;
-
-public class Publisher
-{
- private static final Logger _logger = LoggerFactory.getLogger(Publisher.class);
-
- JNDIHelper _jndiHelper;
-
- Session _session;
-
- MessageProducer _publisher;
-
- AMQConnection _connection;
-
- public static void main(String[] args) throws JMSException
- {
- new Publisher();
- }
-
- public Publisher() throws JMSException
- {
- this(JNDIHelper.DEFAULT_BROKER, null);
- }
-
- public Publisher(String broker, String topicStr) throws JMSException
- {
- _jndiHelper= new JNDIHelper(broker);
-
- _connection = (AMQConnection) ((ConnectionFactory) _jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
-
- _jndiHelper.close();
-
- AMQTopic topic = new AMQTopic(_connection, topicStr);
-
- _logger.debug("Create Session");
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _logger.debug("Create publisher on Session");
-
- if (topicStr != null)
- {
- _publisher = _session.createProducer(_session.createTopic(topicStr));
- }
- else
- {
- _publisher = _session.createProducer(null);
- }
- }
-
- public void close() throws JMSException
- {
- _connection.close();
- }
-
- public void commit() throws JMSException
- {
- _session.commit();
- }
-
- public Message createTextMessage(String msg) throws JMSException
- {
- return _session.createTextMessage(msg);
- }
-
- public void send(Message msg) throws JMSException
- {
- _publisher.send( msg);
- }
-}
14 years, 4 months
rhmessaging commits: r4189 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-08-06 10:10:10 -0400 (Fri, 06 Aug 2010)
New Revision: 4189
Modified:
mgmt/newdata/cumin/python/cumin/charts.py
Log:
Fix for BZ 620923. Convert width, height parameters passed to ALL ImageSurface calls into ints.
Modified: mgmt/newdata/cumin/python/cumin/charts.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/charts.py 2010-08-06 13:46:22 UTC (rev 4188)
+++ mgmt/newdata/cumin/python/cumin/charts.py 2010-08-06 14:10:10 UTC (rev 4189)
@@ -26,7 +26,7 @@
self.surface = None
def plot_dot(self, interior, width, height):
- surface = ImageSurface(FORMAT_ARGB32, width, height)
+ surface = ImageSurface(FORMAT_ARGB32, int(width), int(height))
cr = Context(surface)
cr.set_line_width(1)
@@ -43,7 +43,7 @@
col = 0
self.rows = 1
# the width and height depend on the number of slots
- self.surface = ImageSurface(FORMAT_ARGB32, self.width, self.height)
+ self.surface = ImageSurface(FORMAT_ARGB32, int(self.width), int(self.height))
cr = Context(self.surface)
cr.set_line_width(1)
i = 0
@@ -95,10 +95,10 @@
# if size < self.max_size:
# size = size + 1
- self.width = (size * cols) + 1
+ self.width = int((size * cols) + 1)
#if self.width < self.max_width:
# self.width = self.max_width
- self.height = (ceil(count * 1.0 / cols) * size) + 1
+ self.height = int((ceil(count * 1.0 / cols) * size) + 1)
#if self.height < self.max_height:
# self.height = self.max_height
self.cols = cols
@@ -112,7 +112,7 @@
self.width = width - 40
self.height = height - 20
real_height = height + ((interval > 10) and 12 or 0)
- self.surface = ImageSurface(FORMAT_ARGB32, width, real_height)
+ self.surface = ImageSurface(FORMAT_ARGB32, int(width), int(real_height))
self.surface.set_device_offset(1.5, 5.5)
self.x_max = 1
self.x_min = 0
@@ -296,7 +296,7 @@
super(StackedValueChart, self).__init__(width, height)
if legend_height:
- self.surface = ImageSurface(FORMAT_ARGB32, width, height + legend_height)
+ self.surface = ImageSurface(FORMAT_ARGB32, int(width), int(height + legend_height))
self.surface.set_device_offset(1.5, 5.5)
self.legend_height = legend_height
14 years, 4 months
rhmessaging commits: r4188 - in mgmt/newdata/cumin/python/cumin: grid and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-08-06 09:46:22 -0400 (Fri, 06 Aug 2010)
New Revision: 4188
Modified:
mgmt/newdata/cumin/python/cumin/grid/job.py
mgmt/newdata/cumin/python/cumin/qmfadapter.py
Log:
Better fix for BZ 621678. The results from the GetJobSummaries call returns a dict. Use the dict's keys as the job ids.
Modified: mgmt/newdata/cumin/python/cumin/grid/job.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/job.py 2010-08-05 21:19:10 UTC (rev 4187)
+++ mgmt/newdata/cumin/python/cumin/grid/job.py 2010-08-06 13:46:22 UTC (rev 4188)
@@ -14,6 +14,7 @@
from cumin.formats import *
from cumin.util import *
from cumin.qmfadapter import *
+from parsley.stringex import partition
import main
@@ -106,24 +107,14 @@
rows = self.process_results(results)
return rows
- def process_record(self, record):
+ def process_record(self, key, record):
field_data = list()
for column in self.columns:
try:
val = record[column.name]
except KeyError:
if column.name == "JobId":
- if not "ClusterId" in record:
- try:
- gjid = record["GlobalJobId"]
- # mrg.lab.bos#cluster.prod#number
- parts = gjid.split("#")
- record["ClusterId"] = int(parts[1].split(".")[0])
- except:
- record["ClusterId"] = 0
- if not "ProcId" in record:
- record['ProcId'] = 0
- val = "%d.%d" % (record['ClusterId'], record['ProcId'])
+ val = key
else:
val = 0
field_data.append(val)
Modified: mgmt/newdata/cumin/python/cumin/qmfadapter.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/qmfadapter.py 2010-08-05 21:19:10 UTC (rev 4187)
+++ mgmt/newdata/cumin/python/cumin/qmfadapter.py 2010-08-06 13:46:22 UTC (rev 4188)
@@ -41,12 +41,12 @@
if results:
for key in results:
- row = self.process_record(results[key])
+ row = self.process_record(key, results[key])
records.append(row)
return records
- def process_record(self, record):
+ def process_record(self, key, record):
field_data = list()
for column in self.columns:
try:
14 years, 4 months
rhmessaging commits: r4187 - mgmt/newdata/cumin/python/cumin/grid.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-08-05 17:19:10 -0400 (Thu, 05 Aug 2010)
New Revision: 4187
Modified:
mgmt/newdata/cumin/python/cumin/grid/job.py
Log:
Fix BZ 621678. The response from GetJobSummaries from mrg27 doesn't contain a ClusterId. Use the GlobalJobId to parse out a ClusterId.
Modified: mgmt/newdata/cumin/python/cumin/grid/job.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/job.py 2010-08-04 15:20:33 UTC (rev 4186)
+++ mgmt/newdata/cumin/python/cumin/grid/job.py 2010-08-05 21:19:10 UTC (rev 4187)
@@ -113,6 +113,16 @@
val = record[column.name]
except KeyError:
if column.name == "JobId":
+ if not "ClusterId" in record:
+ try:
+ gjid = record["GlobalJobId"]
+ # mrg.lab.bos#cluster.prod#number
+ parts = gjid.split("#")
+ record["ClusterId"] = int(parts[1].split(".")[0])
+ except:
+ record["ClusterId"] = 0
+ if not "ProcId" in record:
+ record['ProcId'] = 0
val = "%d.%d" % (record['ClusterId'], record['ProcId'])
else:
val = 0
14 years, 4 months