rhmessaging commits: r1267 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-11-08 09:06:24 -0500 (Thu, 08 Nov 2007)
New Revision: 1267
Modified:
store/trunk/cpp/tests/persistence.py
Log:
Add check for restoration of bindings to standard exchange
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-11-08 04:26:36 UTC (rev 1266)
+++ store/trunk/cpp/tests/persistence.py 2007-11-08 14:06:24 UTC (rev 1267)
@@ -55,6 +55,16 @@
channel.queue_declare(queue="queue-a", durable=True, passive=True)
channel.queue_declare(queue="queue-b", durable=True, passive=True)
+ #check they are still bound to amq.direct correctly
+ responses = []
+ responses.append(channel.binding_query(queue="queue-a", exchange="amq.direct", routing_key="a"))
+ responses.append(channel.binding_query(queue="queue-b", exchange="amq.direct", routing_key="b"))
+ for r in responses:
+ self.assertEqual(False, r.exchange_not_found)
+ self.assertEqual(False, r.queue_not_found)
+ self.assertEqual(False, r.key_not_matched)
+
+
#check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
17 years, 2 months
rhmessaging commits: r1266 - in mgmt: cumin/python/cumin and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-07 23:26:36 -0500 (Wed, 07 Nov 2007)
New Revision: 1266
Modified:
mgmt/cumin/bin/cumin-test
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/broker.strings
mgmt/cumin/python/cumin/brokercluster.py
mgmt/cumin/python/cumin/brokergroup.py
mgmt/cumin/python/cumin/brokerprofile.py
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/exchange.strings
mgmt/cumin/python/cumin/formats.py
mgmt/cumin/python/cumin/measurement.strings
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/page.strings
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/queue.strings
mgmt/cumin/python/cumin/realm.py
mgmt/cumin/python/cumin/virtualhost.py
mgmt/cumin/python/wooly/__init__.py
mgmt/cumin/python/wooly/pages.py
mgmt/cumin/python/wooly/widgets.py
mgmt/cumin/python/wooly/widgets.strings
mgmt/misc/boneyard.py
mgmt/notes/justin-todo.txt
Log:
Cleans up some todo items.
Adds rolled-up stats to exchanges and clients.
Renames render_title methods to get_title to reflect that they may be
called outside of render. Adds a render_title method to Widget that
does call get_title.
Adds a none option to the inital group selector in broker register.
Changes the css collator to avoid producing duplicate sets of rules.
Introduces a javascript collator and page.
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/bin/cumin-test 2007-11-08 04:26:36 UTC (rev 1266)
@@ -33,7 +33,7 @@
data.load()
data.start_updates()
- if debug or bench:
+ if debug or bench_hits:
app.enable_debug()
if bench_hits:
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/__init__.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -2,7 +2,7 @@
from random import randint
from wooly import Application, Session, Page
-from wooly.pages import CssPage, ResourcePage
+from wooly.pages import CssPage, JavascriptPage, ResourcePage
from wooly.server import WebServer
from wooly.devel import DevelPage
from wooly.parameters import IntegerParameter
@@ -31,6 +31,7 @@
self.set_default_page(self.cumin_page)
self.add_page(CssPage(self, "cumin.css"))
+ self.add_page(JavascriptPage(self, "cumin.js"))
self.add_page(ResourcePage(self, "resource"))
self.add_page(DevelPage(self, "devel.html"))
self.add_page(QueueXmlPage(self, "queue.xml"))
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -15,8 +15,8 @@
strings = StringCatalog(__file__)
class BrokerSet(ItemSet):
- def render_title(self, session, model):
- return "Brokers (%i)" % len(model.get_brokers())
+ def get_title(self, session, model):
+ return "Brokers %s" % fmt_count(len(model.get_brokers()))
def get_items(self, session, model):
return sorted_by(model.get_brokers())
@@ -140,7 +140,7 @@
self.client.set_object(session, client)
return self.show_mode(session, self.client)
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
return "Broker '%s'" % broker.name
class BrokerConfigPropertyForm(CuminForm, Frame):
@@ -180,6 +180,9 @@
self.lvalue = TextInput(app, "local_value", self)
self.add_child(self.lvalue)
+ def get_title(self, session, prop):
+ return "Edit Property '%s'" % prop.name
+
def get_object(self, session, object):
return self.param.get(session)
@@ -210,9 +213,6 @@
self.svalue.set(session, prop.broker_value)
self.lvalue.set(session, prop.value)
- def render_title(self, session, prop):
- return "Edit Property '%s'" % prop.name
-
def get_profile_value(prop):
profile = prop.get_broker().get_broker_profile()
value = None
@@ -248,7 +248,7 @@
def show_config(self, session):
return self.tabs.show_mode(session, self.config)
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
return "Broker '%s'" % broker.name
def render_name(self, session, broker):
@@ -278,41 +278,39 @@
return "1.0"
class BrokerQueueTab(QueueSet):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
count = len(broker.default_virtual_host.queue_items())
- return "Queues (%i)" % count
+ return "Queues %s" % fmt_count(count)
def get_object(self, session, broker):
return broker.default_virtual_host
class BrokerExchangeTab(ExchangeSet):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
count = len(broker.default_virtual_host.exchange_items())
- return "Exchanges (%i)" % count
+ return "Exchanges %s" % fmt_count(count)
def get_object(self, session, broker):
return broker.default_virtual_host
class BrokerClientTab(ClientSet):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
count = len(broker.default_virtual_host.client_items())
- return "Clients (%i)" % count
+ return "Clients %s" % fmt_count(count)
def get_object(self, session, broker):
return broker.default_virtual_host
class BrokerVirtualHostTab(VirtualHostSet):
- def render_title(self, session, broker):
- return "Configuration"
+ def get_title(self, session, broker):
+ return "Functional Hosts %s" % \
+ fmt_count(len(broker.virtual_host_items()))
- def render_title(self, session, broker):
- return "Functional Hosts (%i)" % len(broker.virtual_host_items())
-
def get_items(self, session, broker):
return sorted_by(broker.virtual_host_items())
class BrokerConfigTab(ConfigPropertySet):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
return "Configuration"
def get_items(self, session, broker):
@@ -339,11 +337,11 @@
return branch.marshal()
class BrokerStatsTab(Widget):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
return "Statistics"
class BrokerLogTab(Widget):
- def render_title(self, session, broker):
+ def get_title(self, session, broker):
return "Log Messages"
class BrokerBrowser(Widget):
@@ -469,6 +467,7 @@
self.add_form_parameter(self.addrs)
self.group_param = BrokerGroupParameter(app, "group_param")
+ self.group_param.set_default(None)
self.add_parameter(self.group_param)
self.add_form_parameter(self.group_param)
@@ -544,6 +543,9 @@
return "More Entries"
class BrokerAdd(BrokerForm):
+ def get_title(self, session, object):
+ return "Register New Brokers"
+
def process_cancel(self, session, model):
branch = session.branch()
self.page().show_view(branch)
@@ -558,6 +560,3 @@
print name, addr, group
self.process_cancel(session, model)
-
- def render_title(self, session, object):
- return "Register New Brokers"
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/broker.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -242,7 +242,12 @@
<tr>
<td><input type="text" name="{field_name_name}" value="{field_name_value}" size="15" tabindex="100"/></td>
<td><input type="text" name="{field_address_name}" value="{field_address_value}" size="35" tabindex="100"/></td>
- <td><select name="{field_group_name}" tabindex="100">{groups}</select></td>
+ <td>
+ <select name="{field_group_name}" tabindex="100">
+ <option>None</option>
+ {groups}
+ </select>
+ </td>
</tr>
[BrokerForm.group_html]
Modified: mgmt/cumin/python/cumin/brokercluster.py
===================================================================
--- mgmt/cumin/python/cumin/brokercluster.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/brokercluster.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -17,8 +17,9 @@
self.broker_tmpl = Template(self, "broker_html")
- def render_title(self, session, model):
- return "Broker Clusters (%i)" % len(model.get_broker_clusters())
+ def get_title(self, session, model):
+ return "Broker Clusters %s" \
+ % fmt_count(len(model.get_broker_clusters()))
def get_items(self, session, model):
return sorted_by(model.get_broker_clusters())
@@ -59,7 +60,7 @@
self.broker.set_object(session, broker)
return self.show_mode(session, self.broker)
- def render_title(self, session, cluster):
+ def get_title(self, session, cluster):
return "Broker Cluster '%s'" % cluster.name
class BrokerClusterStatus(CuminStatus):
@@ -78,19 +79,19 @@
self.tabs.add_tab(self.ClusterBrokerTab(app, "brokers"))
self.tabs.add_tab(self.ClusterStatsTab(app, "stats"))
- def render_title(self, session, cluster):
+ def get_title(self, session, cluster):
return "Broker Cluster '%s'" % cluster.name
def render_name(self, session, cluster):
return cluster.name
class ClusterBrokerTab(BrokerSetForm):
- def render_title(self, session, cluster):
- return "Brokers (%i)" % len(cluster.broker_items())
+ def get_title(self, session, cluster):
+ return "Brokers %s" % fmt_count(len(cluster.broker_items()))
def get_items(self, session, cluster):
return sorted_by(cluster.broker_items())
class ClusterStatsTab(Widget):
- def render_title(self, session, cluster):
+ def get_title(self, session, cluster):
return "Statistics"
Modified: mgmt/cumin/python/cumin/brokergroup.py
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/brokergroup.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -11,8 +11,8 @@
strings = StringCatalog(__file__)
class BrokerGroupSet(ItemSet):
- def render_title(self, session, model):
- return "Broker Groups (%i)" % len(model.get_broker_groups())
+ def get_title(self, session, model):
+ return "Broker Groups %s" % fmt_count(len(model.get_broker_groups()))
def render_group_add_href(self, session, model):
branch = session.branch()
@@ -58,7 +58,7 @@
self.add_child(self.edit)
self.set_edit_mode(self.edit)
- def render_title(self, session, group):
+ def get_title(self, session, group):
return "Broker Group '%s'" % group.name
class BrokerGroupStatus(CuminStatus):
@@ -76,7 +76,7 @@
self.tabs.add_tab(self.GroupBrokerTab(app, "brokers"))
- def render_title(self, session, group):
+ def get_title(self, session, group):
return "Broker Group '%s'" % group.name
def render_name(self, session, group):
@@ -91,12 +91,12 @@
return branch.marshal()
class GroupBrokerTab(BrokerSetForm):
+ def get_title(self, session, group):
+ return "Brokers %s" % fmt_count(len(group.broker_items()))
+
def get_items(self, session, group):
return sorted_by(group.broker_items())
- def render_title(self, session, group):
- return "Brokers (%i)" % len(group.broker_items())
-
class BrokerGroupForm(CuminForm):
def __init__(self, app, name):
super(BrokerGroupForm, self).__init__(app, name)
@@ -117,6 +117,9 @@
self.page().set_redirect_url(session, branch.marshal())
class BrokerGroupAdd(BrokerGroupForm, Frame):
+ def get_title(self, session, model):
+ return "Add Group"
+
def process_cancel(self, session, model):
branch = session.branch()
self.page().show_view(branch)
@@ -126,10 +129,10 @@
group = BrokerGroup(model)
self.process_group(session, group)
- def render_title(self, session, model):
- return "Add Group"
+class BrokerGroupEdit(BrokerGroupForm, Frame):
+ def get_title(self, session, group):
+ return "Edit Group '%s'" % group.name
-class BrokerGroupEdit(BrokerGroupForm, Frame):
def process_cancel(self, session, group):
branch = session.branch()
self.parent.show_view(branch)
@@ -141,9 +144,6 @@
def process_display(self, session, group):
self.group_name.set(session, group.name)
- def render_title(self, session, group):
- return "Edit Group '%s'" % group.name
-
class BrokerGroupInput(OptionInputSet):
def get_items(self, session, model):
return model.get_broker_groups()
Modified: mgmt/cumin/python/cumin/brokerprofile.py
===================================================================
--- mgmt/cumin/python/cumin/brokerprofile.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/brokerprofile.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -30,7 +30,7 @@
self.add_mode(self.view)
self.set_view_mode(self.view)
- def render_title(self, session, profile):
+ def get_title(self, session, profile):
return "Broker Profile '%s'" % profile.name
class BrokerProfileView(Widget):
@@ -43,7 +43,7 @@
self.tabs.add_tab(self.ProfileConfigTab(app, "config"))
self.tabs.add_tab(self.ProfileBrokerTab(app, "brokers"))
- def render_title(self, session, profile):
+ def get_title(self, session, profile):
return "Broker Profile '%s'" % profile.name
def render_name(self, session, profile):
@@ -53,15 +53,15 @@
def get_items(self, session, profile):
return sorted_by(profile.config_property_items())
- def render_title(self, session, profile):
+ def get_title(self, session, profile):
return "Configuration"
class ProfileBrokerTab(BrokerSet):
def __init__(self, app, name):
super(BrokerProfileView.ProfileBrokerTab, self).__init__(app, name)
- def render_title(self, session, profile):
- return "Brokers (%i)" % len(profile.broker_items())
+ def get_title(self, session, profile):
+ return "Brokers %s" % fmt_count(len(profile.broker_items()))
def get_items(self, session, profile):
return sorted_by(profile.broker_items())
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/client.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -16,8 +16,8 @@
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
- def render_title(self, session, vhost):
- return "Clients (%i)" % len(vhost.client_items())
+ def get_title(self, session, vhost):
+ return "Clients %s" % fmt_count(len(vhost.client_items()))
def render_unit_plural(self, session, vhost):
return self.unit.get(session) == "b" and "Bytes" or "Msgs."
@@ -63,12 +63,26 @@
self.add_mode(self.view)
self.set_view_mode(self.view)
- def render_title(self, session, client):
+ def get_title(self, session, client):
return "Client %s" % client.address
class ClientStatus(CuminStatus):
- pass
+ def render_messages_produced(self, session, client):
+ value = client.get_measurement("msgsProduced").get_rate()
+ return fmt_rate(value, "msg", "sec")
+
+ def render_messages_consumed(self, session, client):
+ value = client.get_measurement("msgsConsumed").get_rate()
+ return fmt_rate(value, "msg", "sec")
+ def render_bytes_produced(self, session, client):
+ value = client.get_measurement("bytesProduced").get_rate()
+ return fmt_rate(value, "byte", "sec")
+
+ def render_bytes_consumed(self, session, client):
+ value = client.get_measurement("bytesConsumed").get_rate()
+ return fmt_rate(value, "byte", "sec")
+
class ClientView(Widget):
def __init__(self, app, name):
super(ClientView, self).__init__(app, name)
@@ -87,7 +101,7 @@
def show_sessions(self, session):
return self.tabs.show_mode(session, self.sessions)
- def render_title(self, session, client):
+ def get_title(self, session, client):
return "Client '%s'" % client.address
def render_address(self, session, client):
@@ -109,7 +123,7 @@
self.add_tab(self.StatisticsCurrent(app, "current"))
self.add_tab(self.StatisticsHistory(app, "history"))
- def render_title(self, session, client):
+ def get_title(self, session, client):
return "Statistics"
class StatisticsCurrent(Widget):
@@ -118,11 +132,11 @@
self.add_child(MeasurementSet(app, "general_stats", "general"))
- def render_title(self, session, client):
+ def get_title(self, session, client):
return "Current"
class StatisticsHistory(Widget):
- def render_title(self, session, client):
+ def get_title(self, session, client):
return "History"
def render_produced_chart_url(self, session, client):
@@ -134,8 +148,8 @@
% client.id
class ClientSessionSet(ItemSet):
- def render_title(self, session, client):
- return "Sessions (%i)" % len(client.session_items())
+ def get_title(self, session, client):
+ return "Sessions %s" % fmt_count(len(client.session_items()))
def get_items(self, session, client):
return sorted_by(client.session_items())
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/client.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -32,6 +32,31 @@
<td>{item_status}</td>
</tr>
+[ClientStatus.html]
+<div id="{id}" class="{class}">
+ <h2>Client Status</h2>
+
+ <div>{status_info}</div>
+
+ <table>
+ <tr>
+ <th></th>
+ <th style="width: 35%;" class="ralign">Messages</th>
+ <th style="width: 35%;" class="ralign">Bytes</th>
+ </tr>
+ <tr>
+ <th>Produced</th>
+ <td class="ralign">{messages_produced}</td>
+ <td class="ralign">{bytes_produced}</td>
+ </tr>
+ <tr>
+ <th>Consumed</th>
+ <td class="ralign">{messages_consumed}</td>
+ <td class="ralign">{bytes_consumed}</td>
+ </tr>
+ </table>
+</div>
+
[ClientView.html]
{status}
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/exchange.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -39,7 +39,7 @@
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
- def render_title(self, session, vhost):
+ def get_title(self, session, vhost):
return "Exchanges (%s)" % len(vhost.exchange_items())
def render_unit_plural(self, session, vhost):
@@ -112,12 +112,32 @@
exchange.get_measurement("bindings").link_cb = show_bindings
return exchange
- def render_title(self, session, exchange):
+ def get_title(self, session, exchange):
return "Exchange '%s'" % exchange.name
class ExchangeStatus(CuminStatus):
- pass
+ def render_messages_received(self, session, exchange):
+ value = exchange.get_measurement("msgReceives").get_rate()
+ return fmt_rate(value, "msg", "sec")
+ def render_messages_routed(self, session, exchange):
+ value = exchange.get_measurement("msgRoutes").get_rate()
+ return fmt_rate(value, "msg", "sec")
+
+ def render_messages_dropped(self, session, exchange):
+ return exchange.get_measurement("msgDrops").get_value()
+
+ def render_bytes_received(self, session, exchange):
+ value = exchange.get_measurement("byteReceives").get_rate()
+ return fmt_rate(value, "byte", "sec")
+
+ def render_bytes_routed(self, session, exchange):
+ value = exchange.get_measurement("byteRoutes").get_rate()
+ return fmt_rate(value, "byte", "sec")
+
+ def render_bytes_dropped(self, session, exchange):
+ return exchange.get_measurement("byteDrops").get_value()
+
class ExchangeView(Widget):
def __init__(self, app, name):
super(ExchangeView, self).__init__(app, name)
@@ -142,7 +162,7 @@
def show_bindings(self, session):
self.tabs.show_mode(session, self.bindings);
- def render_title(self, session, exchange):
+ def get_title(self, session, exchange):
return "Exchange '%s'" % exchange.name
def render_name(self, session, exchange):
@@ -165,8 +185,8 @@
return fmt_datetime(datetime.utcnow())
class ExchangeBindingSet(ItemSet):
- def render_title(self, session, exchange):
- return "Queue Bindings (%i)" % len(exchange.binding_items())
+ def get_title(self, session, exchange):
+ return "Queue Bindings %s" % fmt_count(len(exchange.binding_items()))
def get_items(self, session, exchange):
return sorted_by(exchange.binding_items(), "id")
@@ -233,6 +253,9 @@
return error is None
class ExchangeAdd(ExchangeForm):
+ def get_title(self, session, vhost):
+ return "Add Exchange to Host Template '%s'" % vhost.name
+
def process_cancel(self, session, vhost):
branch = session.branch()
self.page().show_broker(branch, vhost.get_broker()).show_view(branch)
@@ -255,10 +278,10 @@
self.page().show_exchange(branch, exchange).show_view(branch)
self.page().set_redirect_url(session, branch.marshal())
- def render_title(self, session, vhost):
- return "Add Exchange to Host Template '%s'" % vhost.name
-
class ExchangeEdit(ExchangeForm):
+ def get_title(self, session, exchange):
+ return "Edit Exchange '%s'" % exchange.name
+
def process_cancel(self, session, exchange):
branch = session.branch()
self.page().show_exchange(branch, exchange).show_view(branch)
@@ -279,10 +302,10 @@
self.exchange_name.set(session, exchange.name)
self.type.set(session, exchange.type)
- def render_title(self, session, exchange):
- return "Edit Exchange '%s'" % exchange.name
+class ExchangeRemove(CuminConfirmForm):
+ def get_title(self, session, exchange):
+ return "Remove Exchange '%s'" % exchange.name
-class ExchangeRemove(CuminConfirmForm):
def process_cancel(self, session, exchange):
branch = session.branch()
self.page().show_exchange(branch, exchange).show_view(branch)
@@ -297,9 +320,6 @@
self.page().show_broker(branch, vhost.get_broker()).show_view(branch)
self.page().set_redirect_url(session, branch.marshal())
- def render_title(self, session, exchange):
- return "Remove Exchange '%s'" % exchange.name
-
def render_submit_content(self, session, exchange):
return "Yes, Remove Exchange '%s'" % exchange.name
@@ -313,7 +333,7 @@
self.add_tab(self.StatisticsCurrent(app, "current"))
self.add_tab(self.StatisticsHistory(app, "history"))
- def render_title(self, session, exchange):
+ def get_title(self, session, exchange):
return "Statistics"
class StatisticsCurrent(Widget):
@@ -322,11 +342,11 @@
self.add_child(MeasurementSet(app, "general_stats", "general"))
- def render_title(self, session, exchange):
+ def get_title(self, session, exchange):
return "Current"
class StatisticsHistory(Widget):
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "History"
def render_received_chart_url(self, session, queue):
@@ -340,8 +360,8 @@
% queue.id
class ExchangeProducerSet(ItemSet):
- def render_title(self, session, queue):
- return "Producers (%i)" % len(queue.producer_items())
+ def get_title(self, session, queue):
+ return "Producers %s" % fmt_count(len(queue.producer_items()))
def get_items(self, session, queue):
return sorted_by(queue.producer_items())
Modified: mgmt/cumin/python/cumin/exchange.strings
===================================================================
--- mgmt/cumin/python/cumin/exchange.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/exchange.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -83,6 +83,36 @@
}())
</script>
+[ExchangeStatus.html]
+<div id="{id}" class="{class}">
+ <h2>Exchange Status</h2>
+
+ <div>{status_info}</div>
+
+ <table>
+ <tr>
+ <th></th>
+ <th style="width: 35%;" class="ralign">Messages</th>
+ <th style="width: 35%;" class="ralign">Bytes</th>
+ </tr>
+ <tr>
+ <th>Received</th>
+ <td class="ralign">{messages_received}</td>
+ <td class="ralign">{bytes_received}</td>
+ </tr>
+ <tr>
+ <th>Routed</th>
+ <td class="ralign">{messages_routed}</td>
+ <td class="ralign">{bytes_routed}</td>
+ </tr>
+ <tr>
+ <th>Dropped</th>
+ <td class="ralign">{messages_dropped}</td>
+ <td class="ralign">{bytes_dropped}</td>
+ </tr>
+ </table>
+</div>
+
[ExchangeView.html]
{status}
Modified: mgmt/cumin/python/cumin/formats.py
===================================================================
--- mgmt/cumin/python/cumin/formats.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/formats.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -2,6 +2,9 @@
from util import *
+def fmt_count(count):
+ return "<span class=\"count\">(%i)</count>" % count
+
def fmt_datetime(dtime):
return dtime.strftime("%d %b %Y %H:%M")
Modified: mgmt/cumin/python/cumin/measurement.strings
===================================================================
--- mgmt/cumin/python/cumin/measurement.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/measurement.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -13,7 +13,7 @@
[MeasurementSet.item_html]
<tr>
- <th class="label">{item_title}</th>
+ <th>{item_title}</th>
<td class="ralign">{item_value}</td>
<td class="ralign">{item_extra}</td>
</tr>
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/model.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -590,9 +590,8 @@
writer.write("<name>%s</name>" % self.name)
writer.write("<latency-priority>%s</latency-priority>" \
% self.latency_priority)
- writer.write("<message-count>%i</message-count>" % self.message_count)
- writer.write("<error-count>%i</error-count>" % self.error_count)
- writer.write("<warning-count>%i</warning-count>" % self.warning_count)
+ writer.write("<error-count>%i</error-count>" % len(self.errors))
+ writer.write("<warning-count>%i</warning-count>" % len(self.warnings))
for realm in self.realm_items():
writer.write("<realm ref=\"realm-%i\"/>" % realm.id)
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/page.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -129,7 +129,7 @@
self.show_view(branch)
return branch.marshal()
- def render_title(self, session, model):
+ def get_title(self, session, model):
return "Red Hat Messaging"
def render_frames(self, session, object):
@@ -164,21 +164,23 @@
mode.set_object(session, group)
return mode
- def render_title(self, session, model):
+ def get_title(self, session, model):
return "Red Hat Messaging"
class BrokerTab(BrokerBrowser):
- def render_title(self, session, model):
- return "Brokers (%i)" % len(model.get_brokers())
+ def get_title(self, session, model):
+ return "Brokers %s" % fmt_count(len(model.get_brokers()))
class BrokerProfileTab(BrokerProfileSet):
- def render_title(self, session, model):
- return "Broker Profiles (%i)" % len(model.get_broker_profiles())
+ def get_title(self, session, model):
+ return "Broker Profiles %s" % \
+ fmt_count(len(model.get_broker_profiles()))
class BrokerClusterTab(BrokerClusterSet):
- def render_title(self, session, model):
- return "Broker Clusters (%i)" % len(model.get_broker_clusters())
+ def get_title(self, session, model):
+ return "Broker Clusters %s" % \
+ fmt_count(len(model.get_broker_clusters()))
class TagTab(Widget):
- def render_title(self, session, model):
+ def get_title(self, session, model):
return "Tags"
Modified: mgmt/cumin/python/cumin/page.strings
===================================================================
--- mgmt/cumin/python/cumin/page.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/page.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -541,6 +541,11 @@
display: inline;
}
+span.count {
+ font-size: 0.9em;
+ color: #999;
+}
+
[CuminPage.html]
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
@@ -548,7 +553,9 @@
<head>
<title>{title}</title>
<link rel="stylesheet" type="text/css" href="cumin.css"/>
+ <!-- XXX import this via cumin.js instead -->
<script src="resource?name=wooly.js"> </script>
+ <script src="cumin.js"> </script>
</head>
<body class="{class}">
<div id="head">
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/queue.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -20,7 +20,7 @@
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
- def render_title(self, session, vhost):
+ def get_title(self, session, vhost):
return "Queues (%s)" % len(vhost.queue_items())
def render_unit_singular(self, session, vhost):
@@ -104,7 +104,7 @@
queue.get_measurement("bindings").link_cb = show_bindings
return queue
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Queue '%s'" % queue.name
class QueueStatus(CuminStatus):
@@ -114,11 +114,11 @@
def render_consumers(self, session, queue):
return queue.get_measurement("consumers").get_value()
- def render_message_enqueues(self, session, queue):
+ def render_messages_enqueued(self, session, queue):
value = queue.get_measurement("msgTotalEnqueues").get_rate()
return fmt_rate(value, "msg", "sec")
- def render_message_dequeues(self, session, queue):
+ def render_messages_dequeued(self, session, queue):
value = queue.get_measurement("msgTotalDequeues").get_rate()
return fmt_rate(value, "msg", "sec")
@@ -129,11 +129,11 @@
value = queue.get_measurement("msgDepth").get_rate()
return fmt_rate(value, "msg", "sec")
- def render_byte_enqueues(self, session, queue):
+ def render_bytes_enqueued(self, session, queue):
value = queue.get_measurement("byteTotalEnqueues").get_rate()
return fmt_rate(value, "byte", "sec")
- def render_byte_dequeues(self, session, queue):
+ def render_bytes_dequeued(self, session, queue):
value = queue.get_measurement("byteTotalDequeues").get_rate()
return fmt_rate(value, "byte", "sec")
@@ -168,7 +168,7 @@
def show_bindings(self, session):
self.tabs.show_mode(session, self.bindings);
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Queue '%s'" % queue.name
def render_name(self, session, queue):
@@ -187,8 +187,8 @@
return fmt_datetime(datetime.utcnow())
class QueueBindingSet(ItemSet):
- def render_title(self, session, queue):
- return "Exchange Bindings (%i)" % len(queue.binding_items())
+ def get_title(self, session, queue):
+ return "Exchange Bindings %s" % fmt_count(len(queue.binding_items()))
def get_items(self, session, queue):
return sorted_by(queue.binding_items(), "id")
@@ -277,7 +277,7 @@
self.page().show_queue(branch, queue).show_view(branch)
self.page().set_redirect_url(session, branch.marshal())
- def render_title(self, session, vhost):
+ def get_title(self, session, vhost):
return "Add Queue to Host Template '%s'" % vhost.name
class QueueEdit(QueueForm):
@@ -303,10 +303,13 @@
self.queue_name.set(session, queue.name)
self.latency_priority.set(session, queue.latency_priority)
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Edit Queue '%s'" % queue.name
class QueueRemove(CuminConfirmForm):
+ def get_title(self, session, queue):
+ return "Remove Queue '%s'" % queue.name
+
def process_cancel(self, session, queue):
branch = session.branch()
self.page().show_queue(branch, queue).show_view(branch)
@@ -321,9 +324,6 @@
self.page().show_broker(branch, vhost.get_broker()).show_view(branch)
self.page().set_redirect_url(session, branch.marshal())
- def render_title(self, session, queue):
- return "Remove Queue '%s'" % queue.name
-
def render_submit_content(self, session, queue):
return "Yes, Remove Queue '%s'" % queue.name
@@ -340,7 +340,7 @@
self.binding_key = TextInput(app, "binding_key", self)
self.add_child(self.binding_key)
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Add Binding to Queue '%s'" % queue.name
def process_cancel(self, session, queue):
@@ -394,7 +394,7 @@
self.process_cancel(session, binding)
- def render_title(self, session, binding):
+ def get_title(self, session, binding):
return "Remove Binding"
def render_submit_content(self, session, binding):
@@ -410,7 +410,7 @@
self.add_tab(self.StatisticsCurrent(app, "current"))
self.add_tab(self.StatisticsHistory(app, "history"))
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Statistics"
class StatisticsCurrent(Widget):
@@ -427,11 +427,11 @@
self.add_child(MeasurementSet \
(app, "transaction_stats", "transaction"))
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "Current"
class StatisticsHistory(Widget):
- def render_title(self, session, queue):
+ def get_title(self, session, queue):
return "History"
def render_depth_chart_url(self, session, queue):
@@ -445,8 +445,8 @@
% queue.id
class QueueConsumerSet(ItemSet):
- def render_title(self, session, queue):
- return "Consumers (%i)" % len(queue.consumer_items())
+ def get_title(self, session, queue):
+ return "Consumers %s" % fmt_count(len(queue.consumer_items()))
def get_items(self, session, queue):
return sorted_by(queue.consumer_items())
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/queue.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -92,9 +92,6 @@
<script defer="defer">
(function() {
var updateStatus = function(xml, elem) {
- var mcount = xml.elems("message-count").next().text().get();
- var messages = mcount + " " + (mcount == "1" && "message" || "messages");
-
var ecount = xml.elems("error-count").next().text().get();
var errors = ecount + " " + (ecount == "1" && "error" || "errors");
@@ -102,19 +99,21 @@
var warnings = wcount + " " + (wcount == "1" && "warning" || "warnings");
if (ecount != "0") {
- elem.node.className = "QueueStatus mstatus red";
+ elem.node.className = "mstatus red";
} else if (wcount != "0") {
- elem.node.className = "QueueStatus mstatus yellow";
+ elem.node.className = "mstatus yellow";
} else {
- elem.node.className = "QueueStatus mstatus green";
+ elem.node.className = "mstatus green";
}
var divs = elem.elems("div");
- divs.next().set(messages + " in queue");
divs.next().set(errors + ", " + warnings);
+
+ // XXX
+ //wooly.doc().elem("{id}.menq").text().set("hmm")
}
- //wooly.setIntervalUpdate("{id}", "{url}", updateStatus, 3000);
+ wooly.setIntervalUpdate("{id}", "{url}", updateStatus, 3000);
}())
</script>
<div id="{id}" class="{class}">
@@ -129,24 +128,24 @@
<th style="width: 35%;" class="ralign">Bytes</th>
</tr>
<tr>
- <th>Enqueues</th>
- <td class="ralign">{message_enqueues}</td>
- <td class="ralign">{byte_enqueues}</td>
+ <th>Enqueued</th>
+ <td id="{id}.menq" class="ralign">{messages_enqueued}</td>
+ <td id="{id}.benq" class="ralign">{bytes_enqueued}</td>
</tr>
<tr>
- <th>Dequeues</th>
- <td class="ralign">{message_dequeues}</td>
- <td class="ralign">{byte_dequeues}</td>
+ <th>Dequeued</th>
+ <td id="{id}.mdeq" class="ralign">{messages_dequeued}</td>
+ <td id="{id}.bdeq" class="ralign">{bytes_dequeued}</td>
</tr>
<tr>
<th>Depth</th>
- <td class="ralign">{message_depth}</td>
- <td class="ralign">{byte_depth}</td>
+ <td id="{id}.mdep" class="ralign">{message_depth}</td>
+ <td id="{id}.bdep" class="ralign">{byte_depth}</td>
</tr>
<tr>
<th>Accel.</th>
- <td class="ralign">{message_depth_accel}</td>
- <td class="ralign">{byte_depth_accel}</td>
+ <td id="{id}.mdepaccel" class="ralign">{message_depth_accel}</td>
+ <td id="{id}.bdepaccel" class="ralign">{byte_depth_accel}</td>
</tr>
</table>
</div>
Modified: mgmt/cumin/python/cumin/realm.py
===================================================================
--- mgmt/cumin/python/cumin/realm.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/realm.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -10,8 +10,8 @@
strings = StringCatalog(__file__)
class RealmSet(ItemSet):
- def render_title(self, session, vhost):
- return "Realms (%i)" % len(vhost.realm_items())
+ def get_title(self, session, vhost):
+ return "Realms %s" % fmt_count(len(vhost.realm_items()))
def get_items(self, session, vhost):
return sorted_by(vhost.realm_items())
Modified: mgmt/cumin/python/cumin/virtualhost.py
===================================================================
--- mgmt/cumin/python/cumin/virtualhost.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/cumin/virtualhost.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -12,8 +12,9 @@
strings = StringCatalog(__file__)
class VirtualHostSet(ItemSet):
- def render_title(self, session, model):
- return "Functional Hosts (%i)" % len(model.get_virtual_hosts())
+ def get_title(self, session, model):
+ return "Functional Hosts %s" % \
+ fmt_count(len(model.get_virtual_hosts()))
def get_items(self, session, model):
return sorted_by(model.get_virtual_hosts())
@@ -56,7 +57,7 @@
self.client.set_object(session, client)
return self.show_mode(session, self.client)
- def render_title(self, session, vhost):
+ def get_title(self, session, vhost):
return "Functional Host '%s'" % vhost.name
class VirtualHostView(Widget):
@@ -70,7 +71,7 @@
self.tabs.add_tab(ExchangeSet(app, "exchanges"))
self.tabs.add_tab(ClientSet(app, "clients"))
- def render_title(self, session, vhost):
+ def get_title(self, session, vhost):
return "Functional Host '%s'" % vhost.name
def render_name(self, session, vhost):
Modified: mgmt/cumin/python/wooly/__init__.py
===================================================================
--- mgmt/cumin/python/wooly/__init__.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/wooly/__init__.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -84,13 +84,16 @@
self.parameters.append(parameter)
parameter.widget = self
+ @classmethod
+ def get_module_strings(cls):
+ module = sys.modules[cls.__module__]
+ return module.__dict__.get("strings")
+
def get_string(self, key):
for cls in self.__class__.__mro__:
str = None
- module = sys.modules[cls.__module__]
+ strs = cls.get_module_strings()
- strs = module.__dict__.get("strings")
-
if strs:
str = strs.get(cls.__name__ + "." + key)
@@ -159,6 +162,9 @@
return writer.to_string()
+ def get_title(self, session, object):
+ return None
+
def render_id(self, session, object):
return self.path()
@@ -169,7 +175,7 @@
return session.marshal()
def render_title(self, session, object):
- return None
+ return self.get_title(session, object)
def render_content(self, session, object):
writer = Writer()
@@ -321,10 +327,10 @@
Gets a list of parameters saved for this page and its current
frame.
- This, combined with *_current_frame, serves to discard state
- that is out of scope. The current pattern is to preserve the
- state of parameters in the current frame and all of its
- ancestor frames.
+ This, combined with [gs]et_current_frame, serves to discard
+ state that is out of scope. The current pattern is to
+ preserve the state of parameters in the current frame and all
+ of its ancestor frames.
"""
frame = self.get_current_frame(session)
@@ -350,11 +356,14 @@
self.widgets = list()
self.widget_index = None
+ self.widget_classes = set()
+
self.parameters = list()
self.parameter_index = None
self.finder = ResourceFinder()
self.cached_css = None
+ self.cached_javascript = None
self.debug = None
@@ -384,6 +393,7 @@
raise Exception()
self.widgets.append(widget)
+ self.widget_classes.add(widget.__class__)
def get_widget(self, key):
if not self.widget_index:
@@ -419,21 +429,43 @@
def get_resource(self, name):
return self.finder.find(name)
+ # XXX move this to CssPage
def get_css(self):
if not self.cached_css:
writer = Writer()
- for widget in self.widgets:
- css = widget.get_string("css")
+ for cls in sorted(self.widget_classes):
+ strs = cls.get_module_strings()
- if css:
- writer.write(css)
- writer.write("\r\n") # HTTP newline
+ if strs:
+ css = strs.get(cls.__name__ + ".css")
+ if css:
+ writer.write(css)
+ writer.write("\r\n") # HTTP newline
+
self.cached_css = writer.to_string()
return self.cached_css
+ def get_javascript(self):
+ if not self.cached_javascript:
+ writer = Writer()
+
+ for cls in sorted(self.widget_classes):
+ strs = cls.get_module_strings()
+
+ if strs:
+ javascript = strs.get(cls.__name__ + ".javascript")
+
+ if javascript:
+ writer.write(javascript)
+ writer.write("\r\n")
+
+ self.cached_javascript = writer.to_string()
+
+ return self.cached_javascript
+
def clear_caches(self):
self.cached_css = None
Modified: mgmt/cumin/python/wooly/pages.py
===================================================================
--- mgmt/cumin/python/wooly/pages.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/wooly/pages.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -17,6 +17,21 @@
def do_render(self, session, object):
return self.app.get_css()
+class JavascriptPage(Page):
+ def __init__(self, app, name):
+ super(JavascriptPage, self).__init__(app, name)
+
+ self.then = datetime.utcnow()
+
+ def get_last_modified(self, session):
+ return self.then
+
+ def get_content_type(self, session):
+ return "text/javascript"
+
+ def do_render(self, session, object):
+ return self.app.get_javascript()
+
class ResourcePage(Page):
def __init__(self, app, name):
super(ResourcePage, self).__init__(app, name)
Modified: mgmt/cumin/python/wooly/widgets.py
===================================================================
--- mgmt/cumin/python/wooly/widgets.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/wooly/widgets.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -10,7 +10,7 @@
self.modes = list()
- self.mode = Parameter(app, "mode")
+ self.mode = Parameter(app, "m")
self.add_parameter(self.mode)
def add_mode(self, mode):
@@ -75,7 +75,7 @@
return smode == mode and "selected" or ""
def render_tab_content(self, session, mode):
- return mode.render_title(session, self.object.get(session))
+ return mode.get_title(session, self.object.get(session))
class Link(Widget):
def update_session(self, session, object):
Modified: mgmt/cumin/python/wooly/widgets.strings
===================================================================
--- mgmt/cumin/python/wooly/widgets.strings 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/cumin/python/wooly/widgets.strings 2007-11-08 04:26:36 UTC (rev 1266)
@@ -2,7 +2,7 @@
{mode}
[TabSet.css]
-ul.TabSet.tabs {
+.TabSet.tabs {
padding: 0;
margin: 0;
list-style: none;
@@ -13,7 +13,7 @@
}
.TabSet.tabs li a {
- padding: 0.25em 0.5em;
+ padding: 0.275em 0.5em;
border-top: 1px solid #ccc;
border-right: 1px solid #ccc;
background-color: #f7f7f7;
Modified: mgmt/misc/boneyard.py
===================================================================
--- mgmt/misc/boneyard.py 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/misc/boneyard.py 2007-11-08 04:26:36 UTC (rev 1266)
@@ -17,8 +17,8 @@
def set_object(self, session, group):
return self.param.set(session, group)
- def render_title(self, session, group):
- return "Servers (%i)" % len(self.app.model.get_servers())
+ def get_title(self, session, group):
+ return "Servers %s" % fmt_count(len(self.app.model.get_servers()))
def render_all_servers_link(self, session, group):
class_ = group is None and "selected"
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-11-08 03:59:29 UTC (rev 1265)
+++ mgmt/notes/justin-todo.txt 2007-11-08 04:26:36 UTC (rev 1266)
@@ -4,29 +4,19 @@
* Queue: Add a msg enq rate msg deq rate chart
- * Right now, non cumin pages don't print their stack traces in the
- log
-
- * Right now we're calling render_title outside of render in order to
- produce tab labels, for instance. This is not good, especially in
- cases where get_object is overriden and that fact is not reflected
- in the render_title calls. Perhaps introduce a get_title to solve
- this.
-
- * Add totals to client msgs produced
-
- * Add rolled up stats for objs other than queues
-
* Pagination and sort in tables
* Ajaxify status boxes
* Ajaxify charts
- * Add a none option to the inital group select in broker register
-
Deferred
+ * Think about making css and jscript pages produce their document in
+ some kind of widget-tree traversal order
+
+ * We're generating lots of duplicate css rules
+
* The granularity of radio and checkbox disabling seems to be off
* Rename Widget.name to .__name
17 years, 2 months
rhmessaging commits: r1265 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-07 22:59:29 -0500 (Wed, 07 Nov 2007)
New Revision: 1265
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
Log:
Added temp JournalImpl::loadMsgContent() fn for holding last read msg from journal so that it can be read in parts.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-08 03:59:29 UTC (rev 1265)
@@ -22,6 +22,7 @@
*/
#include "JournalImpl.h"
+
#include "jrnl/jerrno.hpp"
#include <qpid/sys/Monitor.h>
@@ -41,7 +42,11 @@
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
writeActivityFlag(false),
- flushTriggeredFlag(true)
+ flushTriggeredFlag(true),
+ _datap(0),
+ _dlen(0),
+ _dtok(),
+ _external(false)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -91,6 +96,64 @@
}
}
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000000
+const bool
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length)
+ throw (journal::jexception)
+{
+ if (_dtok.rid() != rid)
+ {
+ _datap = 0;
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_rid(rid);
+ _dtok.set_wstate(journal::data_tok::ENQ);
+ _external = false;
+ void* xidp = 0;
+ size_t xlen = 0;
+ bool transient = false;
+ bool done = false;
+ unsigned aio_sleep_cnt = 0;
+ while (!done)
+ {
+ iores res = read_data_record(&_datap, _dlen, &xidp, xlen, transient, _external, &_dtok);
+ if (res == journal::RHM_IORES_SUCCESS) {
+ done = true;
+ } else if (res == journal::RHM_IORES_AIO_WAIT) {
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+ get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+ ss << "; exceeded maximum wait time";
+ throw jexception(ss.str());
+ }
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+ throw jexception(ss.str());
+ }
+ }
+ // set correct pointer for deletion by boost::shared_ptr
+ if (xlen) {
+ _master = boost::shared_ptr<char>((char*)xidp);
+ } else {
+ _master = boost::shared_ptr<char>((char*)_datap);
+ }
+ }
+ if (_external)
+ return true;
+ if (offset + length > _dlen) {
+ std::stringstream ss;
+ ss << "loadMsgContent(): offset + length exceeds available message size";
+ throw jexception(ss.str());
+ }
+ data.append((const char*)_datap + offset, length);
+ return false;
+}
+
const iores
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-08 03:59:29 UTC (rev 1265)
@@ -73,6 +73,13 @@
bool writeActivityFlag;
bool flushTriggeredFlag;
qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+
+ // temp local vars for loadMsgContent below
+ boost::shared_ptr<char> _master;
+ void* _datap;
+ size_t _dlen;
+ journal::data_tok _dtok;
+ bool _external;
public:
JournalImpl(const std::string& journalId,
@@ -93,6 +100,12 @@
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, queue_id);
}
+
+ // Temporary fn to read and save last msg read from journal so it can be assigned
+ // in chunks. To be replaced when coding to do this direct from the journal is ready.
+ // Returns true if the record is extern, false if local.
+ const bool loadMsgContent(u_int64_t rid, std::string& data, size_t offset,
+ size_t length) throw (journal::jexception);
// Overrides for write inactivity timer
const journal::iores enqueue_data_record(const void* const data_buff,
17 years, 2 months
rhmessaging commits: r1264 - store/trunk/cpp/tests/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-07 15:11:49 -0500 (Wed, 07 Nov 2007)
New Revision: 1264
Modified:
store/trunk/cpp/tests/jrnl/Makefile.am
Log:
Missing header file added to Makefile.am list
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2007-11-07 20:03:15 UTC (rev 1263)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2007-11-07 20:11:49 UTC (rev 1264)
@@ -41,6 +41,7 @@
jtest.cpp \
msg_producer.cpp \
msg_consumer.cpp \
+ JournalSystemTests.hpp \
jtest.hpp \
msg_producer.hpp \
msg_consumer.hpp
17 years, 2 months
rhmessaging commits: r1263 - store/trunk/cpp/tests/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-07 15:03:15 -0500 (Wed, 07 Nov 2007)
New Revision: 1263
Added:
store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
Log:
OOPS! (need I add more?)
Added: store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp 2007-11-07 20:03:15 UTC (rev 1263)
@@ -0,0 +1,92 @@
+/**
+* \file JournalSystemTests.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains header files for the journal unit tests.
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef JournalSystemTests_hpp
+#define JournalSystemTests_hpp
+
+#include "../test_plugin.h"
+#include <jrnl/jcntl.hpp>
+
+class JournalSystemTests : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(JournalSystemTests);
+ CPPUNIT_TEST(InstantiationTest);
+ CPPUNIT_TEST(InitializationTest);
+ CPPUNIT_TEST(EmptyRecoverTest);
+ CPPUNIT_TEST(EnqueueTest);
+ CPPUNIT_TEST(RecoverReadTest);
+ CPPUNIT_TEST(RecoveredReadTest);
+ CPPUNIT_TEST(RecoveredDequeueTest);
+ CPPUNIT_TEST(HeaderFlagsTest);
+// CPPUNIT_TEST(ComplexRecoveryTest1);
+ CPPUNIT_TEST_SUITE_END();
+
+ std::string msg;
+ std::string xid;
+ void* mbuff;
+ size_t msize;
+ void* xidbuff;
+ size_t xidsize;
+ bool transientFlag;
+ bool externalFlag;
+
+public:
+ void InstantiationTest();
+ void InitializationTest();
+ void EmptyRecoverTest();
+ void EnqueueTest();
+ void RecoverReadTest();
+ void RecoveredReadTest();
+ void RecoveredDequeueTest();
+ void HeaderFlagsTest();
+ void ComplexRecoveryTest1();
+
+private:
+ void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient);
+ void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient);
+ void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,
+ const bool transient);
+ void enq_extern_txn_msg(rhm::journal::jcntl* jc, const std::string xid, const bool transient);
+ void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid);
+ void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid);
+ void txn_abort(rhm::journal::jcntl* jc, const std::string xid);
+ void txn_commit(rhm::journal::jcntl* jc, const std::string xid);
+ char* read_msg(rhm::journal::jcntl* jc);
+ bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
+ unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp);
+ static std::string& create_msg(std::string& s, int msg_num, int len);
+ static std::string& create_xid(std::string& s, int msg_num, int len);
+ void cleanMessage();
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(JournalSystemTests);
+
+#endif
17 years, 2 months
rhmessaging commits: r1262 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-07 14:25:50 -0500 (Wed, 07 Nov 2007)
New Revision: 1262
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Bugfix: transient and external header flags were not being handled correctly. Also drastic simplification to JournalSystemTests.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -931,13 +931,13 @@
rhm::journal::iores eres;
if (txn->getXid().empty()){
if (message.isContentReleased()){
- eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+ eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
}else {
eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
}else {
if (message.isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(), false);
+ eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
} else {
eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -506,7 +506,10 @@
*datapp = NULL;
return 0;
}
- *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+ if (_enq_hdr.is_external())
+ *datapp = NULL;
+ else
+ *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
return _enq_hdr._dsize;
}
@@ -539,6 +542,8 @@
const size_t
enq_rec::rec_size() const
{
+ if (_enq_hdr.is_external())
+ return enq_hdr::size() + _enq_hdr._xidsize + rec_tail::size();
return enq_hdr::size() + _enq_hdr._xidsize + _enq_hdr._dsize + rec_tail::size();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -176,6 +176,7 @@
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _rmgr.recover_complete();
_readonly_flag = false;
}
@@ -440,17 +441,22 @@
done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
+//std::cout << " E";
+ if (!er.is_transient()) // Ignore transient msgs
{
- er.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- free(xidp);
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
+ {
+ er.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ free(xidp);
+ }
+ else
+ _emap.insert_fid(h._rid, fid);
}
- else
- _emap.insert_fid(h._rid, fid);
+//else std::cout << "t";
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
}
@@ -463,6 +469,7 @@
done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " D";
if (dr.xid_size())
{
// If the enqueue is part of a pending txn, it will not yet be in emap
@@ -503,6 +510,7 @@
done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " A";
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
assert(xidp != NULL);
@@ -536,6 +544,7 @@
done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " C";
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
assert(xidp != NULL);
@@ -559,11 +568,13 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
+//std::cout << " X";
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
+//std::cout << " 0";
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -139,7 +139,6 @@
_pg_offset_dblks = 0;
_aio_evt_rem = 0;
clean();
-// _emap.clear();
// 1. Allocate page memory (as a single block)
size_t pagesize = _pages * _pagesize * _sblksize;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -215,7 +215,7 @@
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
bool& external, data_tok* dtokp) throw (jexception)
{
-//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << std::flush;
+//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << " po=" << _pg_offset_dblks << " ems=" << _emap.size() << std::flush;
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -465,6 +465,14 @@
}
void
+rmgr::recover_complete()
+{
+ _pg_index = 0;
+ _pg_cntr = 0;
+ _pg_offset_dblks = 0;
+}
+
+void
rmgr::initialize() throw (jexception)
{
pmgr::initialize();
@@ -554,7 +562,10 @@
{
enq_hdr ehdr;
::memcpy(&ehdr, rptr, sizeof(enq_hdr));
- dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ if (ehdr.is_external())
+ dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ else
+ dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) + sizeof(rec_tail));
}
else if (h._magic == RHM_JDAT_DEQ_MAGIC)
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -74,6 +74,7 @@
const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
bool& transient, bool& external, data_tok* dtokp) throw (jexception);
const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
+ void recover_complete();
private:
void initialize() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -96,6 +96,7 @@
data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool transient,
const bool external) throw (jexception)
{
+//std::cout << "wmgr::enqueue() dl=" << tot_data_len << " xl=" << xid_len << " t=" << (transient?"T":"F") << " e=" << (external?"T":"F") << " msg=" << (data_buff?std::string((const char*)data_buff, tot_data_len):"<null>") << std::endl;
if (xid_len)
assert(xid_ptr != NULL);
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -52,13 +52,6 @@
wrfc::~wrfc() {}
-// void
-// wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index, u_int64_t rid) throw (jexception)
-// {
-// rrfc::initialize(nfiles, fh_arr, fh_index);
-// _rid = rid;
-// _reset_ok = false;
-// }
void
wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp) throw (jexception)
{
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -29,10 +29,9 @@
* The GNU Lesser General Public License is available in the file COPYING.
*/
-#include "../test_plugin.h"
-#include "msg_producer.hpp"
-#include "msg_consumer.hpp"
-#include "jtest.hpp"
+#include "JournalSystemTests.hpp"
+//#include "msg_producer.hpp"
+//#include "msg_consumer.hpp"
#include <vector>
#define NUM_MSGS 5
@@ -41,1419 +40,784 @@
#define MSG_SIZE 100
#define XID_SIZE 64
-class JournalSystemTests : public CppUnit::TestCase
+using namespace std;
+
+void
+JournalSystemTests::InstantiationTest()
{
- CPPUNIT_TEST_SUITE(JournalSystemTests);
- CPPUNIT_TEST(InstantiationTest);
- CPPUNIT_TEST(InitializationTest);
- CPPUNIT_TEST(EmptyRecoverTest);
- CPPUNIT_TEST(EnqueueTest);
- CPPUNIT_TEST(TxnEnqueueTest);
- CPPUNIT_TEST(RecoverReadTest);
- CPPUNIT_TEST(TxnRecoverReadTest);
- CPPUNIT_TEST(RecoveredReadTest);
- CPPUNIT_TEST(TxnRecoveredReadTest);
- CPPUNIT_TEST(RecoveredDequeueTest);
- CPPUNIT_TEST(TxnRecoveredDequeueTest);
- CPPUNIT_TEST(ComplexRecoveryTest1);
- CPPUNIT_TEST(TxnComplexRecoveryTest1);
- CPPUNIT_TEST(EncodeTest_000);
- CPPUNIT_TEST(EncodeTest_001);
- CPPUNIT_TEST(EncodeTest_002);
- CPPUNIT_TEST(EncodeTest_003);
- CPPUNIT_TEST(EncodeTest_004);
- CPPUNIT_TEST(EncodeTest_005);
- CPPUNIT_TEST(EncodeTest_006);
- CPPUNIT_TEST(EncodeTest_007);
- CPPUNIT_TEST(EncodeTest_008);
- CPPUNIT_TEST(EncodeTest_009);
- CPPUNIT_TEST(EncodeTest_010);
- CPPUNIT_TEST(EncodeTest_011);
- CPPUNIT_TEST(EncodeTest_012);
- CPPUNIT_TEST(EncodeTest_013);
- CPPUNIT_TEST(EncodeTest_014);
- CPPUNIT_TEST(EncodeTest_015);
- CPPUNIT_TEST(EncodeTest_016);
- CPPUNIT_TEST(EncodeTest_017);
- CPPUNIT_TEST(EncodeTest_018);
- CPPUNIT_TEST(EncodeTest_019);
- CPPUNIT_TEST(EncodeTest_020);
- CPPUNIT_TEST(EncodeTest_021);
- CPPUNIT_TEST(EncodeTest_022);
- CPPUNIT_TEST(EncodeTest_023);
- CPPUNIT_TEST(EncodeTest_024);
- CPPUNIT_TEST(EncodeTest_025);
- CPPUNIT_TEST(EncodeTest_026);
-// CPPUNIT_TEST(EncodeTest_027); // Until race condition fixed
-// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
- CPPUNIT_TEST_SUITE_END();
+ try
+ {
+ char* test_name = "InstantiationTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ }
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- jtest t;
- std::string msg;
- std::string xid;
- void* mbuff;
- size_t msize;
- void* xidbuff;
- size_t xidsize;
- bool transientFlag;
- bool externalFlag;
-
-public:
-
- void InstantiationTest()
+void
+JournalSystemTests::InitializationTest()
+{
+ try
{
- //Stack
- char* test_name = "InstantiationTest_Stack";
- try
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "InstantiationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- delete jcp;
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
+ char* test_name = "InitializationTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void InitializationTest()
+void
+JournalSystemTests::EmptyRecoverTest()
+{
+ try
{
- //Stack
- char* test_name = "InitializationTest_Stack";
- try
+ vector<string> txn_list;
+ char* test_name = "EmptyRecoverTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
}
- // Heap
- test_name = "InitializationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- delete jcp;
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ jc.recover_complete();
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void EmptyRecoverTest()
+void
+JournalSystemTests::EnqueueTest()
+{
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "EmptyRecoverTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- jc.recover_complete();
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "EmptyRecoverTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- jcp->recover_complete();
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
+ char* test_name = "EnqueueTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Non-txn
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+
+ // Txn
+ create_xid(xid, 0, XID_SIZE);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void EnqueueTest()
+void
+JournalSystemTests::RecoverReadTest()
+{
+ vector<string> txn_list;
+ try
{
- //Stack
- char* test_name = "EnqueueTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "EnqueueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void TxnEnqueueTest()
- {
- //Stack
- char* test_name = "TxnEnqueueTest_Stack";
- try
+ // Txn
+ test_name = "TxnRecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
- create_xid(xid, 0, XID_SIZE);
+ create_xid(xid, 1, XID_SIZE);
+ txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
txn_commit(&jc, xid);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnEnqueueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void RecoverReadTest()
+void
+JournalSystemTests::RecoveredReadTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoverReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoveredReadTest";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
}
- catch (const rhm::journal::jexception& e)
+
+ // Txn
+ test_name = "TxnRecoveredReadTest";
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 2, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
- // Heap
- test_name = "RecoverReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- std::string msg((char*)mbuff, msize);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoverReadTest()
+void
+JournalSystemTests::RecoveredDequeueTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoverReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoveredDequeueTest";
{
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 1, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- }
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoverReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- std::string msg((char*)mbuff, msize);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void RecoveredReadTest()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoveredReadTest_Stack";
- try
+ // Txn
+ test_name = "TxnRecoveredDequeueTest";
{
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- }
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 3, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoveredReadTest()
+void
+JournalSystemTests::HeaderFlagsTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoveredReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "FlagsRecoverdTest";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ // Transient msgs - should not recover
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
+ // Persistent msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // Transient extern msgs - should not recover
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_extern_msg(&jc, true);
+ // Persistnet extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_extern_msg(&jc, false);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ // Recover non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 2, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ // Recover non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoveredReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ jc.recover_complete();
+ // Read recovered non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ // Read recovered non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
+ // Dequeue recovered messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
+
+ // Txn
+ test_name = "TxnFlagsRecoverdTest";
{
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 4, XID_SIZE);
+ txn_list.push_back(xid);
+ // Transient msgs - should not recover
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, true);
+ // Persistent msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ // Transient extern msgs - should not recover
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_extern_txn_msg(&jc, xid, true);
+ // Persistnet extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_extern_txn_msg(&jc, xid, false);
+ txn_commit(&jc, xid);
}
- }
-
- void RecoveredDequeueTest()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoveredDequeueTest_Stack";
- try
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ // Recover non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ // Recover non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ jc.recover_complete();
+ // Read recovered non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ // Read recovered non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.", transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag == true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
+ // Dequeue recovered messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoveredDequeueTest()
+void
+JournalSystemTests::ComplexRecoveryTest1()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoveredDequeueTest_Stack";
- try
+ // Non-txn
+ char* test_name = "ComplexRecoveryTest1";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 3, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- }
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+
+ // Check that only last n readable (as before)
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ jc.recover_complete();
+
+ // Enqueue another n msgs
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+
+ // Check that 2n messages are now readable
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+
+ // Dequeue all remaining messages
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void ComplexRecoveryTest1()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "ComplexRecoveryTest1_Stack";
- try
+ // Txn
+ test_name = "TxnComplexRecoveryTest1";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+ // rids: 0 to NUM_MSGS - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // rids: NUM_MSGS to NUM_MSGS*2 - 1
+ create_xid(xid, 5, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ // rid: NUM_MSGS*3
+ txn_commit(&jc, xid);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(&jc, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(&jc, m);
- }
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+
+ // Check that only last n readable (as before)
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(jcp, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
+ jc.recover_complete();
- void TxnComplexRecoveryTest1()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnComplexRecoveryTest1_Stack";
- try
- {
+ // Enqueue another n msgs
+ // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+
+ // Check that 2n messages are now readable
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- // rids: 0 to NUM_MSGS - 1
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS to NUM_MSGS*2 - 1
- create_xid(xid, 4, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- // rid: NUM_MSGS*3
- txn_commit(&jc, xid);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- // rids: NUM_MSGS*3+1 to NUM_MSGS*4
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4+1 to NUM_MSGS*6
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(&jc, m);
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- deq_msg(&jc, m);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
}
+
+ // Dequeue all remaining messages
+ // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnComplexRecoveryTest1_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- create_xid(xid, 4, XID_SIZE);
- // rids: NUM_MSGS to NUM_MSGS*2 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- // rid: NUM_MSGS*3
- txn_commit(jcp, xid);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- // rids: NUM_MSGS*3+1 to NUM_MSGS*4
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4+1 to NUM_MSGS*6
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(jcp, m);
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
-
- void EncodeTest_000()
+ catch (const rhm::journal::jexception& e)
{
- runEncodeTest(0, 0, 0, false, 0, 0, false, false, 2, "Empty journal");
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
}
+}
- void EncodeTest_001()
- {
- runEncodeTest(1, 10, 10, false, 0, 0, false, false, 2, "1*(10 bytes)");
- }
+// === Private helper functions ===
- void EncodeTest_002()
- {
- runEncodeTest(1, 10, 10, true, 0, 0, false, false, 2, "1*(10 bytes), auto-deq");
- }
+void
+JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_003()
- {
- runEncodeTest(10, 10, 10, false, 0, 0, false, false, 2, "10*(10 bytes)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
+ dtp, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_004()
- {
- runEncodeTest(10, 10, 10, true, 0, 0, false, false, 2, "10*(10 bytes), auto-deq");
- }
+void
+JournalSystemTests::enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_005()
- {
- runEncodeTest(10, 92, 92, false, 0, 0, false, false, 2, "10*(1 dblk exact fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
+ dtp, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_006()
- {
- runEncodeTest(10, 92, 92, true, 0, 0, false, false, 2, "10*(1 dblk exact fit), auto-deq");
- }
+void
+JournalSystemTests::enq_txn_msg(rhm::journal::jcntl* jc, const string msg,
+ const string xid, const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_007()
- {
- runEncodeTest(10, 93, 93, false, 0, 0, false, false, 2, "10*(1 dblk + 1 byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(),
+ msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_008()
- {
- runEncodeTest(10, 93, 93, true, 0, 0, false, false, 2, "10*(1 dblk + 1 byte), auto-deq");
- }
+void
+JournalSystemTests::enq_extern_txn_msg(rhm::journal::jcntl* jc, const string xid,
+ const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_009()
- {
- runEncodeTest(10, 476, 476, false, 0, 0, false, false, 2, "10*(1 sblk exact fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(), dtp, xid,
+ transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_010()
- {
- runEncodeTest(10, 476, 476, true, 0, 0, false, false, 2, "10*(1 sblk exact fit), auto-deq");
- }
+void
+JournalSystemTests::deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
- void EncodeTest_011()
- {
- runEncodeTest(10, 477, 477, false, 0, 0, false, false, 2, "10*(1 sblk + 1 byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_012()
- {
- runEncodeTest(10, 477, 477, true, 0, 0, false, false, 2, "10*(1 sblk + 1 byte), auto-deq");
- }
+void
+JournalSystemTests::deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const string xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
- void EncodeTest_013()
- {
- runEncodeTest(8, 4060, 4060, false, 0, 0, false, false, 2, "8*(1/8 page)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
+ jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_014()
- {
- runEncodeTest(9, 4060, 4060, false, 0, 0, false, false, 2, "9*(1/8 page)");
- }
+void
+JournalSystemTests::txn_abort(rhm::journal::jcntl* jc, const string xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_015()
- {
- runEncodeTest(8, 4061, 4061, false, 0, 0, false, false, 2, "8*(1/8 page + 1 byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_016()
- {
- runEncodeTest(8, 3932, 3932, true, 0, 0, false, false, 2,
- "8*(1/8 page - 1 dblk for deq record), auto-deq");
- }
+void
+JournalSystemTests::txn_commit(rhm::journal::jcntl* jc, const string xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- void EncodeTest_017()
- {
- runEncodeTest(9, 3932, 3932, true, 0, 0, false, false, 2,
- "9*(1/8 page - 1 dblk for deq record), auto-deq");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_018()
- {
- runEncodeTest(8, 3933, 3933, true, 0, 0, false, false, 2,
- "8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
- }
+char*
+JournalSystemTests::read_msg(rhm::journal::jcntl* jc)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
- void EncodeTest_019()
- {
- runEncodeTest(32, 32732, 32732, false, 0, 0, false, false, 2, "32*(1 page exact fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->read_data_record(&mbuff, msize, &xidbuff, xidsize,
+ transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
+ return (char*)mbuff;
+}
- void EncodeTest_020()
+bool
+JournalSystemTests::handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
+ unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
+{
+ switch (res)
{
- runEncodeTest(33, 32732, 32732, false, 0, 0, false, false, 2, "33*(1 page exact fit)");
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //((char*)mbuff)[msize] = '\0';
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ {
+ delete dtp;
+ CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
+ }
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_EMPTY");
+ case rhm::journal::RHM_IORES_FULL:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_FULL");
+ case rhm::journal::RHM_IORES_BUSY:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_BUSY");
+ case rhm::journal::RHM_IORES_TXPENDING:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_TXPENDING");
+ default:
+ delete dtp;
+ CPPUNIT_FAIL("unknown return value");
}
+ return true;
+}
- void EncodeTest_021()
- {
- runEncodeTest(22, 49116, 49116, false, 0, 0, false, false, 2, "22*(1.5 pages)");
- }
+// static fn
+string&
+JournalSystemTests::create_msg(string& s, int msg_num, int len)
+{
+ stringstream ss;
+ ss << "Message_" << setfill('0') << setw(4) << msg_num << "_";
+ for (int i=14; i<=len; i++)
+ ss << (char)('0' + i%10);
+ s.assign(ss.str());
+ return s;
+}
- void EncodeTest_022()
- {
- runEncodeTest(22, 48988, 48988, true, 0, 0, false, false, 2,
- "22*(1.5 pages - 1 dblk for deq record), auto-deq");
- }
+// static fn
+string&
+JournalSystemTests::create_xid(string& s, int msg_num, int len)
+{
+ stringstream ss;
+ ss << "XID_" << setfill('0') << setw(4) << msg_num << "_";
+ for (int i=9; i<len; i++)
+ ss << (char)('a' + i%26);
+ s.assign(ss.str());
+ return s;
+}
- void EncodeTest_023()
+void JournalSystemTests::cleanMessage()
+{
+ if (xidbuff)
{
- runEncodeTest(48, 32732, 32732, false, 0, 0, false, false, 2, "48*(1 page exact fit)");
+ free(xidbuff);
+ xidbuff = NULL;
+ mbuff = NULL;
}
-
- void EncodeTest_024()
+ else if (mbuff)
{
- runEncodeTest(49, 32732, 32732, false, 0, 0, false, false, 2, "49*(1 page exact fit)");
+ free (mbuff);
+ mbuff = NULL;
}
-
- void EncodeTest_025()
- {
- runEncodeTest(20, 81884, 81884, false, 0, 0, false, false, 2, "20*(2.5 pages)");
- }
-
- void EncodeTest_026()
- {
- runEncodeTest(20, 81756, 81756, true, 0, 0, false, false, 2,
- "20*(2.5 pages - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_027()
- {
- runEncodeTest(16, 786268, 786268, true, 0, 0, false, false, 2,
- "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
- }
-
- void EncodeTest_028()
- {
- runEncodeTest(17, 786268, 786268, true, 0, 0, false, false, 2,
- "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
- }
-
-private:
-
- void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
- dtp, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
- dtp, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,
- const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(),
- msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_extern_txn_msg(rhm::journal::jcntl* jc, const std::string xid, const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(), dtp, xid,
- transient), jc, aio_sleep_cnt, dtp));
- }
-
- void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(rid);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
- }
-
- void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(rid);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
- jc, aio_sleep_cnt, dtp));
- }
-
- void txn_abort(rhm::journal::jcntl* jc, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
- }
-
- void txn_commit(rhm::journal::jcntl* jc, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
- }
-
- char* read_msg(rhm::journal::jcntl* jc)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_data_record(&mbuff, msize, &xidbuff, xidsize,
- transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
- return (char*)mbuff;
- }
-
- bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
- unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
- {
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //((char*)mbuff)[msize] = '\0';
- return false;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
- {
- jc->get_wr_events();
- usleep(AIO_SLEEP_TIME);
- }
- else
- {
- delete dtp;
- CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
- }
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_EMPTY");
- case rhm::journal::RHM_IORES_FULL:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_FULL");
- case rhm::journal::RHM_IORES_BUSY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_BUSY");
- case rhm::journal::RHM_IORES_TXPENDING:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_TXPENDING");
- default:
- delete dtp;
- CPPUNIT_FAIL("unknown return value");
- }
- return true;
- }
-
- static std::string& create_msg(std::string& s, int msg_num, int len)
- {
- std::stringstream ss;
- ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num << "_";
- for (int i=14; i<=len; i++)
- ss << (char)('0' + i%10);
- s.assign(ss.str());
- return s;
- }
-
- static std::string& create_xid(std::string& s, int msg_num, int len)
- {
- std::stringstream ss;
- ss << "XID_" << std::setfill('0') << std::setw(4) << msg_num << "_";
- for (int i=9; i<len; i++)
- ss << (char)('a' + i%26);
- s.assign(ss.str());
- return s;
- }
-
- void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
- const unsigned max_msg_szie, const bool auto_deq, const unsigned min_xid_size,
- const unsigned max_xid_size, const bool transient, const bool external,
- const unsigned iterations, char* test_descr)
- {
- std::cout << " [" << test_descr << "] " << std::flush;
- jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, min_xid_size,
- max_xid_size, transient, external, test_descr);
- for (unsigned i=0; i<iterations; i++)
- {
- std::cout << "." << std::flush;
- try
- {
- t.initialize(ta);
- t.run();
- }
- catch (rhm::journal::jexception e)
- {
- t.finalize();
- CPPUNIT_FAIL(e.to_string());
- }
- t.finalize();
- }
- }
-
- void cleanMessage()
- {
- if (xidbuff)
- {
- free(xidbuff);
- xidbuff = NULL;
- mbuff = NULL;
- }
- else if (mbuff)
- {
- free (mbuff);
- mbuff = NULL;
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(JournalSystemTests);
+}
17 years, 2 months
rhmessaging commits: r1261 - in mgmt: cumin/python/wooly and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-07 12:19:21 -0500 (Wed, 07 Nov 2007)
New Revision: 1261
Modified:
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/broker.strings
mgmt/cumin/python/cumin/brokergroup.py
mgmt/cumin/python/wooly/forms.py
mgmt/cumin/python/wooly/forms.strings
mgmt/notes/justin-todo.txt
Log:
Adds an initial group drop-down to the broker registration form.
Adds a reusable option input widget to the form widgets.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-07 17:19:21 UTC (rev 1261)
@@ -468,11 +468,20 @@
self.add_parameter(self.addrs)
self.add_form_parameter(self.addrs)
+ self.group_param = BrokerGroupParameter(app, "group_param")
+ self.add_parameter(self.group_param)
+ self.add_form_parameter(self.group_param)
+
+ self.groups = ListParameter(app, "group", self.group_param)
+ self.add_parameter(self.groups)
+ self.add_form_parameter(self.groups)
+
self.fields = IntegerParameter(app, "fields")
self.fields.set_default(3)
self.add_parameter(self.fields)
self.field_tmpl = Template(self, "field_html")
+ self.group_tmpl = Template(self, "group_html")
self.more = self.MoreEntries(app, "more", self)
self.add_child(self.more)
@@ -500,11 +509,36 @@
def render_field_address_name(self, session, object):
return self.addrs.path()
+ def render_field_group_name(self, session, object):
+ return self.groups.path()
+
def render_field_address_value(self, session, index):
addrs = self.addrs.get(session)
if len(addrs) > index:
return addrs[index]
-
+
+ def render_groups(self, session, index):
+ writer = Writer()
+
+ for group in self.app.model.get_broker_groups():
+ self.group_tmpl.render(session, (index, group), writer)
+
+ return writer.to_string()
+
+ def render_group_value(self, session, args):
+ index, group = args
+ return group.id
+
+ def render_group_name(self, session, args):
+ index, group = args
+ return group.name
+
+ def render_group_selected_attr(self, session, args):
+ index, group = args
+ groups = self.groups.get(session)
+ if len(groups) > index and group.id == groups[index].id:
+ return "selected=\"selected\""
+
class MoreEntries(FormButton):
def render_content(self, session, model):
return "More Entries"
@@ -518,9 +552,10 @@
def process_submit(self, session, model):
names = self.names.get(session)
addrs = self.addrs.get(session)
+ groups = self.groups.get(session)
- for name, addr in zip(names, addrs):
- print name, addr
+ for name, addr, group in zip(names, addrs, groups):
+ print name, addr, group
self.process_cancel(session, model)
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/cumin/python/cumin/broker.strings 2007-11-07 17:19:21 UTC (rev 1261)
@@ -212,7 +212,8 @@
<table class="BrokerForm">
<tr>
<th>Name</th>
- <th>Address</th>
+ <th>Host Name or IP Address</th>
+ <th>Initial Group</th>
</tr>
{fields}
@@ -239,6 +240,10 @@
[BrokerForm.field_html]
<tr>
- <td><input type="text" name="{field_name_name}" value="{field_name_value}" size="20" tabindex="100"/></td>
- <td><input type="text" name="{field_address_name}" value="{field_address_value}" size="40" tabindex="100"/></td>
+ <td><input type="text" name="{field_name_name}" value="{field_name_value}" size="15" tabindex="100"/></td>
+ <td><input type="text" name="{field_address_name}" value="{field_address_value}" size="35" tabindex="100"/></td>
+ <td><select name="{field_group_name}" tabindex="100">{groups}</select></td>
</tr>
+
+[BrokerForm.group_html]
+<option value="{group_value}" {group_selected_attr}>{group_name}</option>
Modified: mgmt/cumin/python/cumin/brokergroup.py
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.py 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/cumin/python/cumin/brokergroup.py 2007-11-07 17:19:21 UTC (rev 1261)
@@ -143,3 +143,13 @@
def render_title(self, session, group):
return "Edit Group '%s'" % group.name
+
+class BrokerGroupInput(OptionInputSet):
+ def get_items(self, session, model):
+ return model.get_broker_groups()
+
+ def render_item_value(self, session, group):
+ return group.id
+
+ def render_item_selected_attr(self, session, group):
+ return None
Modified: mgmt/cumin/python/wooly/forms.py
===================================================================
--- mgmt/cumin/python/wooly/forms.py 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/cumin/python/wooly/forms.py 2007-11-07 17:19:21 UTC (rev 1261)
@@ -205,9 +205,6 @@
return super(FormButton, self).render_value(branch, object)
class CheckboxInputSet(FormInput, ItemSet):
- def __init__(self, app, name, form):
- super(CheckboxInputSet, self).__init__(app, name, form)
-
def render_item_value(self, session, object):
return None
@@ -220,3 +217,10 @@
def render_item_checked_attr(self, session, object):
return None
+
+class OptionInputSet(FormInput, ItemSet):
+ def render_item_value(self, session, object):
+ return None
+
+ def render_item_selected_attr(self, session, object):
+ return None
Modified: mgmt/cumin/python/wooly/forms.strings
===================================================================
--- mgmt/cumin/python/wooly/forms.strings 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/cumin/python/wooly/forms.strings 2007-11-07 17:19:21 UTC (rev 1261)
@@ -30,3 +30,10 @@
[RadioInputSet.item_html]
<input type="radio" name="{name}" value="{item_value}" tabindex="{tab_index}" {item_checked_attr} {disabled_attr}/>
{item_content}
+
+[OptionInputSet.html]
+{errors}
+<select name="{name}" tabindex="{tab_index}" {disabled_attr}>{items}</select>
+
+[OptionInputSet.item_html]
+<option value="{item_value}" tabindex="{tab_index}" {item_selected_attr} {disabled_attr}>{item_content}</option>
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-11-07 16:11:13 UTC (rev 1260)
+++ mgmt/notes/justin-todo.txt 2007-11-07 17:19:21 UTC (rev 1261)
@@ -13,8 +13,6 @@
in the render_title calls. Perhaps introduce a get_title to solve
this.
- * Add initial group drop down to broker add form
-
* Add totals to client msgs produced
* Add rolled up stats for objs other than queues
@@ -25,8 +23,12 @@
* Ajaxify charts
+ * Add a none option to the inital group select in broker register
+
Deferred
+ * The granularity of radio and checkbox disabling seems to be off
+
* Rename Widget.name to .__name
* Make the status lights also be links to an appropriate view
17 years, 2 months
rhmessaging commits: r1260 - in mgmt: cumin/python/wooly and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-07 11:11:13 -0500 (Wed, 07 Nov 2007)
New Revision: 1260
Modified:
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/wooly/__init__.py
mgmt/cumin/python/wooly/forms.py
mgmt/cumin/python/wooly/forms.strings
mgmt/notes/justin-todo.txt
Log:
Overhauls error handling. Puts an errors attribute on each widget.
Introduces an Error class for subclassing to create reusable error
types.
Also renames render_tabindex methods to tab_index.
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/cumin/python/cumin/exchange.py 2007-11-07 16:11:13 UTC (rev 1260)
@@ -216,23 +216,21 @@
self.add_child(self.fanout)
def validate(self, session):
- valid = True
+ error = None
name = self.exchange_name.get(session)
if name == "":
- valid = False
- self.exchange_name.add_error(session, """
- The exchange name is empty; it is required
- """)
+ error = EmptyInputError(exchange_name)
+ self.exchange_name.add_error(session, error)
elif " " in name:
- valid = False
- self.exchange_name.add_error(session, """
+ error = Error("""
The exchange name is invalid; allowed characters are
letters, digits, ".", and "_"
""")
+ self.exchange_name.add_error(session, error)
- return valid
+ return error is None
class ExchangeAdd(ExchangeForm):
def process_cancel(self, session, vhost):
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/cumin/python/cumin/queue.py 2007-11-07 16:11:13 UTC (rev 1260)
@@ -238,23 +238,21 @@
self.add_child(self.throughput)
def validate(self, session):
- valid = True
+ error = None
name = self.queue_name.get(session)
if name == "":
- valid = False
- self.queue_name.add_error(session, """
- The queue name is empty; it is required
- """)
+ error = EmptyInputError(self.queue_name)
+ self.queue_name.add_error(session, error)
elif " " in name:
- valid = False
- self.queue_name.add_error(session, """
+ error = Error("""
The queue name is invalid; allowed characters are
letters, digits, ".", and "_"
""")
+ self.queue_name.add_error(session, error)
- return valid
+ return error is None
class QueueAdd(QueueForm):
def process_cancel(self, session, vhost):
Modified: mgmt/cumin/python/wooly/__init__.py
===================================================================
--- mgmt/cumin/python/wooly/__init__.py 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/cumin/python/wooly/__init__.py 2007-11-07 16:11:13 UTC (rev 1260)
@@ -17,8 +17,13 @@
self.children = list()
self.attributes = list()
self.parameters = list()
+
self.template = Template(self, "html")
+ self.errors = Attribute(app, "errors")
+ self.errors.set_default(list())
+ self.add_attribute(self.errors)
+
self.cached_ancestors = None
self.cached_path = None
self.cached_page = None
@@ -97,6 +102,12 @@
if str:
return str
+ def get_errors(self, session):
+ return self.errors.get(session)
+
+ def add_error(self, session, error):
+ self.errors.get(session).append(error)
+
def get_saved_parameters(self, session):
params = list()
params.extend(self.parameters)
@@ -491,17 +502,6 @@
if key in self.values:
del self.values[key]
- # XXX this is a little out of line with other session methods in
- # that it uses widget as a key rather than having a method through
- # widget to do that
- def get_errors(self, widget):
- return self.errors.get(widget)
-
- def add_error(self, widget, error):
- errors = self.errors.setdefault(widget, list())
-
- errors.append(error)
-
def marshal(self):
page = self.marshal_page()
vars = self.marshal_url_vars()
@@ -622,6 +622,10 @@
class Writer(StringIOWriter):
pass
+class Error(object):
+ def __init__(self, message):
+ self.message = message
+
class Template(object):
def __init__(self, widget, key):
self.widget = widget
Modified: mgmt/cumin/python/wooly/forms.py
===================================================================
--- mgmt/cumin/python/wooly/forms.py 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/cumin/python/wooly/forms.py 2007-11-07 16:11:13 UTC (rev 1260)
@@ -44,7 +44,6 @@
writer.write("<input type='hidden' name='%s' value='%s'/>" \
% (name, value))
-
class FormInput(Widget):
def __init__(self, app, name, form):
super(FormInput, self).__init__(app, name)
@@ -77,12 +76,6 @@
def set_default(self, default):
self.param.set_default(default)
- def add_error(self, session, error):
- session.add_error(self, error)
-
- def get_errors(self, session):
- return session.get_errors(self)
-
def set_tab_index(self, tab_index):
self.tab_index = tab_index
@@ -95,21 +88,37 @@
def render_value(self, session, object):
return self.param.marshal(self.param.get(session))
- # XXX do this proper
def render_errors(self, session, object):
- errors = self.get_errors(session)
+ writer = Writer()
+
+ if self.get_errors(session):
+ self.errors_tmp.render(session, object, writer)
- if errors:
- return "<ul class=\"errors\"><li>" + \
- "</li><li>".join(errors) + \
- "</li></ul>"
+ return writer.to_string()
- def render_tabindex(self, session, object):
+ def render_error_messages(self, session, object):
+ writer = Writer()
+
+ for error in self.get_errors(session):
+ self.errors_tmpl.render(session, error, writer)
+
+ return writer.to_string()
+
+ def render_error_message(self, session, error):
+ return error.message
+
+ def render_tab_index(self, session, object):
return self.tab_index
def render_disabled_attr(self, session, object):
return self.disabled and "disabled=\"disabled\"" or None
+class EmptyInputError(Error):
+ def __init__(self, input):
+ message = "Input '%s' is empty; it is required" % input.name
+
+ super(EmptyInputError, self).__init__(message)
+
class TextInput(FormInput):
def __init__(self, app, name, form):
super(TextInput, self).__init__(app, name, form)
Modified: mgmt/cumin/python/wooly/forms.strings
===================================================================
--- mgmt/cumin/python/wooly/forms.strings 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/cumin/python/wooly/forms.strings 2007-11-07 16:11:13 UTC (rev 1260)
@@ -1,26 +1,32 @@
[FormButton.html]
<button id="{id}" type="submit" name="{name}" value="{value}" tabindex="{tabindex}" {disabled_attr}>{content}</button>
+[FormInput.errors_html]
+<ul class="errors">{error_messages}</ul>
+
+[FormInput.error_message_html]
+<li>{error_message}</li>
+
[TextInput.html]
{errors}
-<input type="text" name="{name}" value="{value}" tabindex="{tabindex}" {disabled_attr} size="{size}"/>
+<input type="text" name="{name}" value="{value}" tabindex="{tab_index}" {disabled_attr} size="{size}"/>
[CheckboxInput.html]
-<input type="checkbox" name="{name}" value="{value}" tabindex="{tabindex}" {checked_attr} {disabled_attr}/>
+<input type="checkbox" name="{name}" value="{value}" tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
[RadioInput.html]
-<input type="radio" name="{name}" value="{value}" tabindex="{tabindex}" {checked_attr} {disabled_attr}/>
+<input type="radio" name="{name}" value="{value}" tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
[CheckboxInputSet.html]
{errors}{items}
[CheckboxInputSet.item_html]
-<input type="checkbox" name="{name}" value="{item_value}" tabindex="{tabindex}" {item_checked_attr} {disabled_attr}/>
+<input type="checkbox" name="{name}" value="{item_value}" tabindex="{tab_index}" {item_checked_attr} {disabled_attr}/>
{item_content}
[RadioInputSet.html]
{errors}{items}
[RadioInputSet.item_html]
-<input type="radio" name="{name}" value="{item_value}" tabindex="{tabindex}" {item_checked_attr} {disabled_attr}/>
+<input type="radio" name="{name}" value="{item_value}" tabindex="{tab_index}" {item_checked_attr} {disabled_attr}/>
{item_content}
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-11-07 15:33:34 UTC (rev 1259)
+++ mgmt/notes/justin-todo.txt 2007-11-07 16:11:13 UTC (rev 1260)
@@ -2,8 +2,6 @@
* Add ability to send a test message to a queue
- * Use page attributes for session errors
-
* Queue: Add a msg enq rate msg deq rate chart
* Right now, non cumin pages don't print their stack traces in the
@@ -29,6 +27,8 @@
Deferred
+ * Rename Widget.name to .__name
+
* Make the status lights also be links to an appropriate view
- Defer until we know what we're going to link to
17 years, 2 months
rhmessaging commits: r1259 - mgmt/notes.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-07 10:33:34 -0500 (Wed, 07 Nov 2007)
New Revision: 1259
Modified:
mgmt/notes/justin-todo.txt
Log:
Adds todo items from UI review.
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-11-07 15:05:52 UTC (rev 1258)
+++ mgmt/notes/justin-todo.txt 2007-11-07 15:33:34 UTC (rev 1259)
@@ -15,6 +15,18 @@
in the render_title calls. Perhaps introduce a get_title to solve
this.
+ * Add initial group drop down to broker add form
+
+ * Add totals to client msgs produced
+
+ * Add rolled up stats for objs other than queues
+
+ * Pagination and sort in tables
+
+ * Ajaxify status boxes
+
+ * Ajaxify charts
+
Deferred
* Make the status lights also be links to an appropriate view
17 years, 2 months
rhmessaging commits: r1258 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-11-07 10:05:52 -0500 (Wed, 07 Nov 2007)
New Revision: 1258
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
- staging support
- no staged enqueue, content released, then loaded not yet supported on async
mode
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 15:05:52 UTC (rev 1258)
@@ -492,16 +492,22 @@
{
case rhm::journal::RHM_IORES_SUCCESS:{
msg_count++;
- char* data = (char*)dbuff;
- unsigned headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
-
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
+
+ unsigned headerSize;
+ if (externalFlag){
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
msg->setPersistenceId(dtokp.rid());
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize)) {
+ if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
@@ -545,6 +551,29 @@
}
+RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t messageId, unsigned& headerSize)
+{
+ Dbt key (&messageId, sizeof(messageId));
+ size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+
+ BufferValue value(preamble_length, 0);
+ value.buffer.record();
+ if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ }
+ //read header only to begin with
+ headerSize = value.buffer.getLong();
+
+ BufferValue header(headerSize, preamble_length);
+ if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ }
+
+ return recovery.recoverMessage(header.buffer);
+}
+
+
// bdb version
void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery, queue_index& index,
txn_list& locked, message_index& prepared)
@@ -700,13 +729,13 @@
txn.begin(env);
u_int64_t messageId (msg.getPersistenceId());
- if (messageId == 0) {
+ if (messageId == 0 || !msg.isContentReleased()) {
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
store(NULL, &txn, key, msg, true);
msg.setPersistenceId(messageId);
- txn.commit();
+ txn.commit();
} catch (std::exception& e) {
txn.abort();
throw e;
@@ -813,8 +842,6 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc){
jc->flush();
-// ::usleep(10000); /////////////// hack ----------- FIX!!
-// jc->get_wr_events();
}
}catch ( journal::jexception& e) {
THROW_STORE_EXCEPTION("Flush failed: " + e.to_string() );
@@ -854,6 +881,7 @@
if (usingJrnl()){
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ if (msg.isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
}else{
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
@@ -879,10 +907,14 @@
{
u_int32_t headerSize = message.encodedHeaderSize();
u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
- char* buff = static_cast<char*>(::alloca(size)); // long + headers + content
- Buffer buffer(buff,size);
- buffer.putLong(headerSize);
- message.encode(buffer);
+ char* buff= 0;
+ if (!message.isContentReleased() )
+ {
+ buff = static_cast<char*>(::alloca(size)); // long + headers + content
+ Buffer buffer(buff,size);
+ buffer.putLong(headerSize);
+ message.encode(buffer);
+ }
try {
@@ -898,9 +930,17 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
rhm::journal::iores eres;
if (txn->getXid().empty()){
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ if (message.isContentReleased()){
+ eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+ }else {
+ eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ }
}else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+ if (message.isContentReleased()){
+ eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(), false);
+ } else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+ }
}
switch (eres)
{
@@ -963,6 +1003,13 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
// added here as we are not doing it async on call back
+ if (msg.isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
+ {
+ Dbt key (&messageId, sizeof(messageId));
+ Dbt value (&queueId, sizeof(queueId));
+ dequeue(txn->get(), key, value);
+ }
+
msg.dequeueComplete();
if ( msg.isDequeueComplete() ) // clear id after last dequeue
msg.setPersistenceId(0);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-07 15:05:52 UTC (rev 1258)
@@ -88,6 +88,8 @@
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& locked, message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
17 years, 2 months