rhmessaging commits: r1088 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 12:11:12 -0400 (Tue, 16 Oct 2007)
New Revision: 1088
Modified:
mgmt/cumin/python/cumin/demo.py
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/server.strings
Log:
Adds server values to the server config property demo data and model
and renders them in the UI.
Modified: mgmt/cumin/python/cumin/demo.py
===================================================================
--- mgmt/cumin/python/cumin/demo.py 2007-10-16 16:03:14 UTC (rev 1087)
+++ mgmt/cumin/python/cumin/demo.py 2007-10-16 16:11:12 UTC (rev 1088)
@@ -109,16 +109,19 @@
prop = ConfigProperty(self.model)
prop.name = "max_threads"
prop.value = 1000
+ prop.server_value = 1000
obj.add_config_property(prop)
prop = ConfigProperty(self.model)
prop.name = "max_memory"
prop.value = 1000000
+ prop.server_value = 2000000
obj.add_config_property(prop)
prop = ConfigProperty(self.model)
prop.name = "ssl_enabled"
prop.value = True
+ prop.server_value = True
obj.add_config_property(prop)
def load_vhost(self, vhost):
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-10-16 16:03:14 UTC (rev 1087)
+++ mgmt/cumin/python/cumin/model.py 2007-10-16 16:11:12 UTC (rev 1088)
@@ -185,6 +185,7 @@
self.name = None
self.value = None
+ self.server_value = None
class ServerGroup(ModelObject):
def __init__(self, model):
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 16:03:14 UTC (rev 1087)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 16:11:12 UTC (rev 1088)
@@ -165,6 +165,8 @@
self.process_cancel(session, prop)
def process_display(self, session, prop):
+ self.pvalue.set(session, get_profile_value(prop))
+ self.svalue.set(session, prop.server_value)
self.lvalue.set(session, prop.value)
def render_title(self, session, prop):
@@ -225,6 +227,9 @@
def get_items(self, session, server):
return sorted_by(server.config_property_items())
+ def render_item_server_value(self, session, prop):
+ return prop.server_value
+
def render_item_profile_value(self, session, prop):
return get_profile_value(prop)
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-16 16:03:14 UTC (rev 1087)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-16 16:11:12 UTC (rev 1088)
@@ -52,7 +52,7 @@
<tr>
<td>{item_name}</td>
<td>{item_value}</td>
- <td></td>
+ <td>{item_server_value}</td>
<td>{item_profile_value}</td>
<td><a class="action" href="{item_edit_href}">Edit</a></td>
</tr>
17 years, 2 months
rhmessaging commits: r1087 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 12:03:14 -0400 (Tue, 16 Oct 2007)
New Revision: 1087
Modified:
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/server.strings
Log:
Adds the profile value to the server config properties.
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 15:39:36 UTC (rev 1086)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 16:03:14 UTC (rev 1087)
@@ -170,6 +170,17 @@
def render_title(self, session, prop):
return "Edit Property '%s'" % prop.name
+def get_profile_value(prop):
+ profile = prop.get_server().get_server_profile()
+ value = None
+
+ if profile:
+ for p in profile.config_property_items():
+ if p.name == prop.name:
+ value = p.value
+
+ return value
+
class ServerView(Widget):
def __init__(self, app, name):
super(ServerView, self).__init__(app, name)
@@ -214,6 +225,9 @@
def get_items(self, session, server):
return sorted_by(server.config_property_items())
+ def render_item_profile_value(self, session, prop):
+ return get_profile_value(prop)
+
def render_item_edit_href(self, session, prop):
branch = session.branch()
frame = self.page().show_server(branch, prop.get_server())
@@ -420,8 +434,8 @@
self.tabs = TabSet(app, "tabs")
self.add_child(self.tabs)
+ self.tabs.add_tab(self.ProfileConfigTab(app, "config"))
self.tabs.add_tab(self.ProfileServerTab(app, "servers"))
- self.tabs.add_tab(self.ProfileConfigTab(app, "config"))
def render_title(self, session, profile):
return "Server Profile '%s'" % profile.name
@@ -429,6 +443,13 @@
def render_name(self, session, profile):
return profile.name
+ class ProfileConfigTab(ConfigPropertySet):
+ def get_items(self, session, profile):
+ return sorted_by(profile.config_property_items())
+
+ def render_title(self, session, profile):
+ return "Configuration"
+
class ProfileServerTab(ServerSet):
def __init__(self, app, name):
super(ServerProfileView.ProfileServerTab, self).__init__(app, name)
@@ -438,10 +459,3 @@
def get_items(self, session, profile):
return sorted_by(profile.server_items())
-
- class ProfileConfigTab(ConfigPropertySet):
- def get_items(self, session, profile):
- return sorted_by(profile.config_property_items())
-
- def render_title(self, session, profile):
- return "Configuration"
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-16 15:39:36 UTC (rev 1086)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-16 16:03:14 UTC (rev 1087)
@@ -53,7 +53,7 @@
<td>{item_name}</td>
<td>{item_value}</td>
<td></td>
- <td></td>
+ <td>{item_profile_value}</td>
<td><a class="action" href="{item_edit_href}">Edit</a></td>
</tr>
17 years, 2 months
rhmessaging commits: r1086 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 11:39:36 -0400 (Tue, 16 Oct 2007)
New Revision: 1086
Modified:
mgmt/cumin/python/cumin/cluster.py
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/realm.py
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/virtualhost.py
Log:
Use sorted_by instead of custom sort functions everywhere.
Modified: mgmt/cumin/python/cumin/cluster.py
===================================================================
--- mgmt/cumin/python/cumin/cluster.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/cluster.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -4,6 +4,7 @@
from virtualhost import *
from server import *
from widgets import *
+from util import *
strings = StringCatalog(__file__)
@@ -12,7 +13,7 @@
return "Clusters (%i)" % len(model.get_clusters())
def get_items(self, session, model):
- return sorted(model.get_clusters(), cmp, lambda x: x.name)
+ return sorted_by(model.get_clusters())
def render_item_link(self, session, cluster):
branch = session.branch()
@@ -25,7 +26,7 @@
return "Servers (%i)" % len(cluster.server_items())
def get_items(self, session, cluster):
- return sorted(cluster.server_items(), cmp, lambda x: x.name)
+ return sorted_by(cluster.server_items())
def render_item_link(self, session, server):
branch = session.branch()
@@ -98,7 +99,7 @@
return "Functional Hosts (%i)" % len(cluster.virtual_host_items())
def get_items(self, session, cluster):
- return sorted(cluster.virtual_host_items(), cmp, lambda x: x.name)
+ return sorted_by(cluster.virtual_host_items())
class ConfigTab(Widget):
def render_title(self, session, cluster):
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/exchange.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -5,6 +5,7 @@
from model import *
from widgets import *
+from util import *
strings = StringCatalog(__file__)
@@ -24,7 +25,7 @@
self.set_parameter(param)
def get_items(self, session, vhost):
- return sorted(vhost.exchange_items(), cmp, lambda x: x.name)
+ return sorted_by(vhost.exchange_items())
def render_item_value(self, session, exchange):
return exchange.id
@@ -40,7 +41,7 @@
return "Exchanges (%s)" % len(vhost.exchange_items())
def get_items(self, session, vhost):
- return sorted(vhost.exchange_items(), cmp, lambda x: x.name)
+ return sorted_by(vhost.exchange_items())
def render_item_link(self, session, exchange):
branch = session.branch()
@@ -107,8 +108,7 @@
return "Bindings (%i)" % len(exchange.binding_items())
def get_items(self, session, exchange):
- return sorted(exchange.binding_items(), cmp,
- lambda x: x.get_queue().name)
+ return sorted_by(exchange.binding_items(), "id")
def render_item_href(self, session, binding):
branch = session.branch()
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/queue.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -6,6 +6,7 @@
from model import *
from widgets import *
from exchange import ExchangeInputSet
+from util import *
strings = StringCatalog(__file__)
@@ -32,7 +33,7 @@
return "Queues (%s)" % len(vhost.queue_items())
def get_items(self, session, vhost):
- return sorted(vhost.queue_items(), cmp, lambda x: x.name)
+ return sorted_by(vhost.queue_items())
def render_item_link(self, session, queue):
branch = session.branch()
@@ -137,8 +138,7 @@
return "Bindings (%i)" % len(queue.binding_items())
def get_items(self, session, queue):
- return sorted(queue.binding_items(), cmp,
- lambda x: x.get_exchange().name)
+ return sorted_by(queue.binding_items(), "id")
def render_item_href(self, session, binding):
branch = session.branch()
@@ -331,8 +331,7 @@
class Exchanges(ExchangeInputSet):
def get_items(self, session, queue):
- return sorted(queue.virtual_host.exchange_items(), cmp,
- lambda x: x.name)
+ return sorted_by(queue.virtual_host.exchange_items())
class QueueBindingRemove(CuminConfirmForm):
def process_confirm(self, session, binding):
Modified: mgmt/cumin/python/cumin/realm.py
===================================================================
--- mgmt/cumin/python/cumin/realm.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/realm.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -5,6 +5,7 @@
from model import *
from widgets import *
+from util import *
strings = StringCatalog(__file__)
@@ -13,7 +14,7 @@
return "Realms (%i)" % len(vhost.realm_items())
def get_items(self, session, vhost):
- return sorted(vhost.realm_items())
+ return sorted_by(vhost.realm_items())
def render_item_name(self, session, realm):
return realm.name
@@ -34,7 +35,7 @@
self.set_parameter(param)
def get_items(self, session, vhost):
- return sorted(vhost.realm_items())
+ return sorted_by(vhost.realm_items())
# XXX just parked here
def do_process(self, session, queue):
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -284,7 +284,7 @@
self.group_tmpl = Template(self, "group_html")
def get_items(self, session, model):
- return sorted(model.get_server_group_types(), cmp, lambda x: x.name)
+ return sorted_by(model.get_server_group_types())
def render_types(self, session, model):
writer = Writer()
Modified: mgmt/cumin/python/cumin/virtualhost.py
===================================================================
--- mgmt/cumin/python/cumin/virtualhost.py 2007-10-16 15:34:43 UTC (rev 1085)
+++ mgmt/cumin/python/cumin/virtualhost.py 2007-10-16 15:39:36 UTC (rev 1086)
@@ -5,6 +5,7 @@
from realm import *
from exchange import *
from widgets import *
+from util import *
strings = StringCatalog(__file__)
@@ -13,7 +14,7 @@
return "Functional Hosts (%i)" % len(model.get_virtual_hosts())
def get_items(self, session, model):
- return sorted(model.get_virtual_hosts(), cmp, lambda x: x.name)
+ return sorted_by(model.get_virtual_hosts())
def render_item_link(self, session, vhost):
branch = session.branch()
17 years, 2 months
rhmessaging commits: r1085 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 11:34:43 -0400 (Tue, 16 Oct 2007)
New Revision: 1085
Modified:
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/server.strings
Log:
Fills in the profile server and configuration tabs.
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-10-16 15:03:48 UTC (rev 1084)
+++ mgmt/cumin/python/cumin/page.py 2007-10-16 15:34:43 UTC (rev 1085)
@@ -39,6 +39,9 @@
self.group_edit = ServerGroupEdit(app, "groupedit")
self.add_mode(self.group_edit)
+ self.profile = ServerProfileFrame(app, "profile")
+ self.add_mode(self.profile)
+
def save_session(self, session):
if self.app.debug:
self.app.debug.sessions.append(session)
@@ -92,6 +95,11 @@
frame.set_object(session, cluster)
return self.page().set_current_frame(session, frame)
+ def show_server_profile(self, session, profile):
+ frame = self.show_mode(session, self.profile)
+ frame.set_object(session, profile)
+ return self.page().set_current_frame(session, frame)
+
def show_virtual_host(self, session, vhost):
server = vhost.get_server()
cluster = vhost.get_cluster()
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 15:03:48 UTC (rev 1084)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 15:34:43 UTC (rev 1085)
@@ -12,7 +12,7 @@
return "Servers (%i)" % len(model.get_servers())
def get_items(self, session, model):
- return sorted(model.get_servers(), cmp, lambda x: x.name)
+ return sortedby(model.get_servers())
def render_item_link(self, session, server):
branch = session.branch()
@@ -89,30 +89,16 @@
def render_none(self, session, group):
return none()
-class ServerProfileSet(ItemSet):
+class ConfigPropertySet(ItemSet):
def get_items(self, session, model):
- return sorted_by(model.get_server_profiles())
+ return sorted_by(model.get_config_properties())
- def render_item_link(self, session, profile):
- branch = session.branch()
- return mlink(branch.marshal(), "ServerProfile", profile.name)
-
-class ServerConfigPropertySet(ItemSet):
- def get_items(self, session, server):
- return sorted_by(server.config_property_items())
-
def render_item_name(self, session, prop):
return prop.name
def render_item_value(self, session, prop):
return prop.value
- def render_item_edit_href(self, session, prop):
- branch = session.branch()
- frame = self.page().show_server(branch, prop.get_server())
- frame.show_config_property(branch, prop)
- return branch.marshal()
-
class ConfigPropertyParameter(Parameter):
def do_unmarshal(self, string):
return self.app.model.get_config_property(int(string))
@@ -191,10 +177,10 @@
self.tabs = TabSet(app, "tabs")
self.add_child(self.tabs)
- self.tabs.add_tab(self.VirtualHostTab(app, "vhosts"))
- self.tabs.add_tab(self.ConfigTab(app, "config"))
- self.tabs.add_tab(self.StatsTab(app, "stats"))
- self.tabs.add_tab(self.LogTab(app, "log"))
+ self.tabs.add_tab(self.ServerVirtualHostTab(app, "vhosts"))
+ self.tabs.add_tab(self.ServerConfigTab(app, "config"))
+ self.tabs.add_tab(self.ServerStatsTab(app, "stats"))
+ self.tabs.add_tab(self.ServerLogTab(app, "log"))
def render_title(self, session, server):
return "Server '%s'" % server.name
@@ -214,22 +200,31 @@
return html
- class VirtualHostTab(VirtualHostSet):
+ class ServerVirtualHostTab(VirtualHostSet):
def render_title(self, session, server):
return "Functional Hosts (%i)" % len(server.virtual_host_items())
def get_items(self, session, server):
- return sorted(server.virtual_host_items(), cmp, lambda x: x.name)
+ return sorted_by(server.virtual_host_items())
- class ConfigTab(ServerConfigPropertySet):
+ class ServerConfigTab(ConfigPropertySet):
def render_title(self, session, server):
return "Configuration"
- class StatsTab(Widget):
+ def get_items(self, session, server):
+ return sorted_by(server.config_property_items())
+
+ def render_item_edit_href(self, session, prop):
+ branch = session.branch()
+ frame = self.page().show_server(branch, prop.get_server())
+ frame.show_config_property(branch, prop)
+ return branch.marshal()
+
+ class ServerStatsTab(Widget):
def render_title(self, session, server):
return "Statistics"
- class LogTab(Widget):
+ class ServerLogTab(Widget):
def render_title(self, session, server):
return "Log Messages"
@@ -383,3 +378,70 @@
def render_title(self, session, group):
return "Edit Group '%s'" % group.name
+
+class ServerProfileSet(ItemSet):
+ def get_items(self, session, model):
+ return sorted_by(model.get_server_profiles())
+
+ def render_item_link(self, session, profile):
+ branch = session.branch()
+ frame = self.page().show_server_profile(branch, profile)
+ frame.show_view(branch)
+ return mlink(branch.marshal(), "ServerProfile", profile.name)
+
+class ServerProfileParameter(Parameter):
+ def do_unmarshal(self, string):
+ return self.app.model.get_server_profile(int(string))
+
+ def do_marshal(self, profile):
+ return str(profile.id)
+
+class ServerProfileFrame(CuminFrame):
+ def __init__(self, app, name):
+ super(ServerProfileFrame, self).__init__(app, name)
+
+ self.param = ServerProfileParameter(app, "id")
+ self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
+
+ self.view = ServerProfileView(app, "view")
+ self.add_mode(self.view)
+
+ def show_view(self, session):
+ return self.show_mode(session, self.view)
+
+ def render_title(self, session, profile):
+ return "Server Profile '%s'" % profile.name
+
+class ServerProfileView(Widget):
+ def __init__(self, app, name):
+ super(ServerProfileView, self).__init__(app, name)
+
+ self.tabs = TabSet(app, "tabs")
+ self.add_child(self.tabs)
+
+ self.tabs.add_tab(self.ProfileServerTab(app, "servers"))
+ self.tabs.add_tab(self.ProfileConfigTab(app, "config"))
+
+ def render_title(self, session, profile):
+ return "Server Profile '%s'" % profile.name
+
+ def render_name(self, session, profile):
+ return profile.name
+
+ class ProfileServerTab(ServerSet):
+ def __init__(self, app, name):
+ super(ServerProfileView.ProfileServerTab, self).__init__(app, name)
+
+ def render_title(self, session, profile):
+ return "Servers (%i)" % len(profile.server_items())
+
+ def get_items(self, session, profile):
+ return sorted_by(profile.server_items())
+
+ class ProfileConfigTab(ConfigPropertySet):
+ def get_items(self, session, profile):
+ return sorted_by(profile.config_property_items())
+
+ def render_title(self, session, profile):
+ return "Configuration"
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-16 15:03:48 UTC (rev 1084)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-16 15:34:43 UTC (rev 1085)
@@ -1,5 +1,5 @@
[ServerSet.html]
-<table class="ServerSet mobjects">
+<table class="mobjects">
<tr>
<th>Server</th>
<th>Cluster</th>
@@ -30,9 +30,10 @@
<td>{item_link}</td>
</tr>
-[ServerConfigPropertySet.html]
+[ServerConfigTab.html]
<ul class="actions">
- <li><a href="">Apply Configuration to This Server</a></li>
+ <li><a href="">Apply This Configuration to Server</a></li>
+ <li><a href="">Add Property</a></li>
</ul>
<table class="mobjects">
@@ -47,7 +48,7 @@
{items}
</table>
-[ServerConfigPropertySet.item_html]
+[ServerConfigTab.item_html]
<tr>
<td>{item_name}</td>
<td>{item_value}</td>
@@ -208,3 +209,40 @@
elem.select();
}())
</script>
+
+[ServerProfileView.html]
+<div class="oblock">
+ <h1>{title}</h1>
+
+ <dl class="properties">
+ <dt>Name</dt><dd>{name}</dd>
+ </dl>
+
+ <ul class="actions">
+ <li><a href="">Edit This Profile</a></li>
+ </ul>
+
+ {tabs}
+</div>
+
+[ProfileConfigTab.html]
+<ul class="actions">
+ <li><a href="">Add Property</a></li>
+</ul>
+
+<table class="mobjects">
+ <tr>
+ <th>Property</th>
+ <th>Configured Value</th>
+ <th></th>
+ </tr>
+
+ {items}
+</table>
+
+[ProfileConfigTab.item_html]
+<tr>
+ <td>{item_name}</td>
+ <td>{item_value}</td>
+ <td><a class="action" href="">Edit</a></td>
+</tr>
17 years, 2 months
rhmessaging commits: r1084 - in store/trunk/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: astitcher
Date: 2007-10-16 11:03:48 -0400 (Tue, 16 Oct 2007)
New Revision: 1084
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BindingDbt.cpp
store/trunk/cpp/tests/SimpleTest.cpp
Log:
Fixed for change in field table API
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-16 14:49:55 UTC (rev 1083)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-16 15:03:48 UTC (rev 1084)
@@ -369,7 +369,7 @@
FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
- buffer.getFieldTable(args);
+ buffer.get(args);
exchange_index::iterator exchange = exchanges.find(key.id);
queue_index::iterator queue = queues.find(queueId);
if (exchange != exchanges.end() && queue != queues.end()) {
Modified: store/trunk/cpp/lib/BindingDbt.cpp
===================================================================
--- store/trunk/cpp/lib/BindingDbt.cpp 2007-10-16 14:49:55 UTC (rev 1083)
+++ store/trunk/cpp/lib/BindingDbt.cpp 2007-10-16 15:03:48 UTC (rev 1084)
@@ -37,7 +37,7 @@
buffer.putLongLong(q.getPersistenceId());
buffer.putShortString(q.getName());
buffer.putShortString(k);
- buffer.putFieldTable(a);
+ buffer.put(a);
set_data(data);
set_size(encodedSize(e, q, k, a));
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-10-16 14:49:55 UTC (rev 1083)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-10-16 15:03:48 UTC (rev 1084)
@@ -30,6 +30,7 @@
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQMethodBody.h>
#include <qpid/framing/ChannelAdapter.h>
+#include <qpid/framing/FieldValue.h>
#include <qpid/broker/Message.h>
#include <qpid/broker/Queue.h>
#include <qpid/broker/DirectExchange.h>
@@ -224,7 +225,7 @@
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
+ CPPUNIT_ASSERT(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
DummyHandler handler;
@@ -339,7 +340,7 @@
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
CPPUNIT_ASSERT_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
+ CPPUNIT_ASSERT(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) (data1.size() + data2.size()), msg->getFrames().getHeaders()->getContentLength());
CPPUNIT_ASSERT_EQUAL((u_int64_t) 0, msg->contentSize());//ensure it is being lazily loaded
@@ -432,7 +433,7 @@
CPPUNIT_ASSERT_EQUAL(id, exchange->getPersistenceId());
CPPUNIT_ASSERT_EQUAL(type, exchange->getType());
CPPUNIT_ASSERT(exchange->isDurable());
- CPPUNIT_ASSERT_EQUAL(args.getString("a"), exchange->getArgs().getString("a"));
+ CPPUNIT_ASSERT_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
store.destroy(*exchange);
}
{
17 years, 2 months
rhmessaging commits: r1083 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 10:49:55 -0400 (Tue, 16 Oct 2007)
New Revision: 1083
Modified:
mgmt/cumin/python/cumin/widgets.py
Log:
Rename CuminFrame's parameter so it doesn't get clobbered.
Modified: mgmt/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/cumin/python/cumin/widgets.py 2007-10-16 14:38:51 UTC (rev 1082)
+++ mgmt/cumin/python/cumin/widgets.py 2007-10-16 14:49:55 UTC (rev 1083)
@@ -18,16 +18,16 @@
def __init__(self, app, name):
super(CuminFrame, self).__init__(app, name)
- self.param = None
+ self.__param = None
def set_object_parameter(self, param):
- self.param = param
+ self.__param = param
def get_object(self, session, object):
- return self.param.get(session)
+ return self.__param.get(session)
def set_object(self, session, object):
- return self.param.set(session, object)
+ return self.__param.set(session, object)
def show_view(self, session):
pass
17 years, 2 months
rhmessaging commits: r1082 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 10:38:51 -0400 (Tue, 16 Oct 2007)
New Revision: 1082
Modified:
mgmt/cumin/python/cumin/cluster.py
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/virtualhost.py
mgmt/cumin/python/cumin/widgets.py
mgmt/notes/Todo
Log:
Adds a set_object to CuminFrame and uses it instead of specific
methods on every instance of CuminFrame.
Modified: mgmt/cumin/python/cumin/cluster.py
===================================================================
--- mgmt/cumin/python/cumin/cluster.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/cluster.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -46,6 +46,7 @@
self.param = ClusterParameter(app, "id")
self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
self.view = ClusterView(app, "view")
self.add_mode(self.view)
@@ -56,21 +57,15 @@
self.vhost = VirtualHostFrame(app, "vhost")
self.add_mode(self.vhost)
- def get_object(self, session, object):
- return self.param.get(session)
-
- def set_cluster(self, session, cluster):
- self.param.set(session, cluster)
-
def show_view(self, session):
return self.show_mode(session, self.view)
def show_server(self, session, server):
- self.server.set_server(session, server)
+ self.server.set_object(session, server)
return self.show_mode(session, self.server)
def show_virtual_host(self, session, vhost):
- self.vhost.set_virtual_host(session, vhost)
+ self.vhost.set_object(session, vhost)
return self.show_mode(session, self.vhost)
def render_title(self, session, cluster):
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/exchange.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -61,16 +61,11 @@
self.param = ExchangeParameter(app, "id")
self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
self.view = ExchangeView(app, "view")
self.add_mode(self.view)
- def get_object(self, session, object):
- return self.param.get(session)
-
- def set_exchange(self, session, exchange):
- return self.param.set(session, exchange)
-
def show_view(self, session):
return self.show_mode(session, self.view)
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/page.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -68,7 +68,7 @@
frame = frame.show_server(session, server)
else:
frame = self.show_mode(session, self.server)
- frame.set_server(session, server)
+ frame.set_object(session, server)
return self.page().set_current_frame(session, frame)
@@ -84,12 +84,12 @@
def show_server_group_edit(self, session, group):
frame = self.show_mode(session, self.group_edit)
- frame.set_server_group(session, group)
+ frame.set_object(session, group)
return self.page().set_current_frame(session, frame)
def show_cluster(self, session, cluster):
frame = self.show_mode(session, self.cluster)
- frame.set_cluster(session, cluster)
+ frame.set_object(session, cluster)
return self.page().set_current_frame(session, frame)
def show_virtual_host(self, session, vhost):
@@ -157,7 +157,7 @@
def show_server_group(self, session, group):
mode = self.show_mode(session, self.servers)
- mode.set_server_group(session, group)
+ mode.set_object(session, group)
return mode
def render_title(self, session, model):
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/queue.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -64,16 +64,11 @@
self.param = QueueParameter(app, "id")
self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
self.view = QueueView(app, "view")
self.add_mode(self.view)
- def get_object(self, session, object):
- return self.param.get(session)
-
- def set_queue(self, session, queue):
- return self.param.set(session, queue)
-
def show_view(self, session):
return self.show_mode(session, self.view)
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -44,6 +44,7 @@
self.param = ServerParameter(app, "id")
self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
self.view = ServerView(app, "view")
self.add_mode(self.view)
@@ -54,17 +55,11 @@
self.prop = ServerConfigPropertyForm(app, "prop")
self.add_mode(self.prop)
- def get_object(self, session, object):
- return self.param.get(session)
-
- def set_server(self, session, server):
- self.param.set(session, server)
-
def show_view(self, session):
return self.show_mode(session, self.view)
def show_virtual_host(self, session, vhost):
- self.vhost.set_virtual_host(session, vhost)
+ self.vhost.set_object(session, vhost)
return self.show_mode(session, self.vhost)
def show_config_property(self, session, prop):
@@ -254,7 +249,7 @@
def get_object(self, session, object):
return self.param.get(session)
- def set_server_group(self, session, group):
+ def set_object(self, session, group):
return self.param.set(session, group)
def render_title(self, session, group):
@@ -264,7 +259,7 @@
class_ = group is None and "selected"
branch = session.branch()
- self.set_server_group(branch, None)
+ self.set_object(branch, None)
return link(branch.marshal(), "All Servers", class_)
def render_add_group_href(self, session, group):
@@ -380,7 +375,7 @@
def get_object(self, session, object):
return self.param.get(session)
- def set_server_group(self, session, group):
+ def set_object(self, session, group):
self.param.set(session, group)
def process_display(self, session, group):
Modified: mgmt/cumin/python/cumin/virtualhost.py
===================================================================
--- mgmt/cumin/python/cumin/virtualhost.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/virtualhost.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -34,6 +34,7 @@
self.param = VirtualHostParameter(app, "id")
self.add_parameter(self.param)
+ self.set_object_parameter(self.param)
self.queue = QueueFrame(app, "queue")
self.add_mode(self.queue)
@@ -44,21 +45,15 @@
self.view = VirtualHostView(app, "view")
self.add_mode(self.view)
- def get_object(self, session, object):
- return self.param.get(session)
-
- def set_virtual_host(self, session, vhost):
- return self.param.set(session, vhost)
-
def show_view(self, session):
return self.show_mode(session, self.view)
def show_queue(self, session, queue):
- self.queue.set_queue(session, queue)
+ self.queue.set_object(session, queue)
return self.show_mode(session, self.queue)
def show_exchange(self, session, exchange):
- self.exchange.set_exchange(session, exchange)
+ self.exchange.set_object(session, exchange)
return self.show_mode(session, self.exchange)
def render_title(self, session, vhost):
Modified: mgmt/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/cumin/python/cumin/widgets.py 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/cumin/python/cumin/widgets.py 2007-10-16 14:38:51 UTC (rev 1082)
@@ -15,6 +15,20 @@
return "<span class=\"none\">None</span>"
class CuminFrame(Frame, ModeSet):
+ def __init__(self, app, name):
+ super(CuminFrame, self).__init__(app, name)
+
+ self.param = None
+
+ def set_object_parameter(self, param):
+ self.param = param
+
+ def get_object(self, session, object):
+ return self.param.get(session)
+
+ def set_object(self, session, object):
+ return self.param.set(session, object)
+
def show_view(self, session):
pass
Modified: mgmt/notes/Todo
===================================================================
--- mgmt/notes/Todo 2007-10-16 12:52:04 UTC (rev 1081)
+++ mgmt/notes/Todo 2007-10-16 14:38:51 UTC (rev 1082)
@@ -48,9 +48,6 @@
* cumindev: add a cumin-test function and bind it to C-c C-c
- * Consider adding a set_object to CuminFrame, instead of having
- set_somethingspecific on each frame.
-
* Consider having a cssclass set on widgets
* Use page attributes for session errors, redirect
17 years, 2 months
rhmessaging commits: r1081 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 08:52:04 -0400 (Tue, 16 Oct 2007)
New Revision: 1081
Modified:
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/util.py
mgmt/notes/Todo
Log:
Renames csorted to sorted_by.
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 12:48:21 UTC (rev 1080)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 12:52:04 UTC (rev 1081)
@@ -96,7 +96,7 @@
class ServerProfileSet(ItemSet):
def get_items(self, session, model):
- return csorted(model.get_server_profiles())
+ return sorted_by(model.get_server_profiles())
def render_item_link(self, session, profile):
branch = session.branch()
@@ -104,7 +104,7 @@
class ServerConfigPropertySet(ItemSet):
def get_items(self, session, server):
- return csorted(server.config_property_items())
+ return sorted_by(server.config_property_items())
def render_item_name(self, session, prop):
return prop.name
@@ -310,7 +310,7 @@
def render_groups(self, session, type):
writer = Writer()
- for group in csorted(type.server_group_items()):
+ for group in sorted_by(type.server_group_items()):
self.group_tmpl.render(session, group, writer)
return writer.to_string()
@@ -326,9 +326,9 @@
class BrowserServers(ServerSet):
def get_items(self, session, group):
if group:
- return csorted(group.server_items())
+ return sorted_by(group.server_items())
else:
- return csorted(self.app.model.get_servers())
+ return sorted_by(self.app.model.get_servers())
class ServerGroupParameter(Parameter):
def do_unmarshal(self, string):
Modified: mgmt/cumin/python/cumin/util.py
===================================================================
--- mgmt/cumin/python/cumin/util.py 2007-10-16 12:48:21 UTC (rev 1080)
+++ mgmt/cumin/python/cumin/util.py 2007-10-16 12:52:04 UTC (rev 1081)
@@ -1,2 +1,2 @@
-def csorted(seq, attr="name"):
+def sorted_by(seq, attr="name"):
return sorted(seq, cmp, lambda x: getattr(x, attr))
Modified: mgmt/notes/Todo
===================================================================
--- mgmt/notes/Todo 2007-10-16 12:48:21 UTC (rev 1080)
+++ mgmt/notes/Todo 2007-10-16 12:52:04 UTC (rev 1081)
@@ -61,6 +61,4 @@
* Add link disabling; use it for the server groups edit and remove
links
- * Rename csorted to sorted_by, a niceer name
-
* Add a parameter that takes a modelclass
17 years, 2 months
rhmessaging commits: r1080 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-16 08:48:21 -0400 (Tue, 16 Oct 2007)
New Revision: 1080
Modified:
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/server.strings
Log:
Adds a server profiles tab to the main view.
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-10-16 12:29:00 UTC (rev 1079)
+++ mgmt/cumin/python/cumin/page.py 2007-10-16 12:48:21 UTC (rev 1080)
@@ -153,6 +153,7 @@
self.add_tab(self.servers)
self.add_tab(self.ClusterTab(app, "clusters"))
+ self.add_tab(self.ProfileTab(app, "profiles"))
def show_server_group(self, session, group):
mode = self.show_mode(session, self.servers)
@@ -174,3 +175,13 @@
def render_title(self, session, model):
return self.clusters.render_title(session, model)
+
+ class ProfileTab(Widget):
+ def __init__(self, app, name):
+ super(MainView.ProfileTab, self).__init__(app, name)
+
+ self.profiles = ServerProfileSet(app, "profiles")
+ self.add_child(self.profiles)
+
+ def render_title(self, session, model):
+ return "Server Profiles (%i)" % len(model.get_server_profiles())
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-16 12:29:00 UTC (rev 1079)
+++ mgmt/cumin/python/cumin/server.py 2007-10-16 12:48:21 UTC (rev 1080)
@@ -94,6 +94,14 @@
def render_none(self, session, group):
return none()
+class ServerProfileSet(ItemSet):
+ def get_items(self, session, model):
+ return csorted(model.get_server_profiles())
+
+ def render_item_link(self, session, profile):
+ branch = session.branch()
+ return mlink(branch.marshal(), "ServerProfile", profile.name)
+
class ServerConfigPropertySet(ItemSet):
def get_items(self, session, server):
return csorted(server.config_property_items())
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-16 12:29:00 UTC (rev 1079)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-16 12:48:21 UTC (rev 1080)
@@ -5,7 +5,8 @@
<th>Cluster</th>
<th>Status</th>
</tr>
-{items}
+
+ {items}
</table>
[ServerSet.item_html]
@@ -15,6 +16,20 @@
<td>0 errors, 0 warnings</td>
</tr>
+[ServerProfileSet.html]
+<table class="ServerProfileSet mobjects">
+ <tr>
+ <th>Server Profile</th>
+ </tr>
+
+ {items}
+</table>
+
+[ServerProfileSet.item_html]
+<tr>
+ <td>{item_link}</td>
+</tr>
+
[ServerConfigPropertySet.html]
<ul class="actions">
<li><a href="">Apply Configuration to This Server</a></li>
17 years, 2 months
rhmessaging commits: r1079 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-10-16 08:29:00 -0400 (Tue, 16 Oct 2007)
New Revision: 1079
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
- Txn fixes
- TPC fixes
- All unit tests now pass
- still has sys tests issues, so jrnl not enabled yet
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -262,15 +262,18 @@
}
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- RecoverableTransaction::shared_ptr dtx =
- registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid, &messageIdSequence)));
+
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->dequeue(queues[j->first], messages[j->second]);
}
}
@@ -427,7 +430,8 @@
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
- if (xidbuffSize > 0 && PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
+
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
prepared[dtokp.rid()] = msg;
} else {
queue->recover(msg);
@@ -450,8 +454,6 @@
break;
case rhm::journal::RHM_IORES_EMPTY:
read = false;
- // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
- assert (jc->get_enq_cnt() == msg_count);
break; // done with all messages. ((add call in jrnl to test that _emap is empty.
default:
assert( "Store Error: Unexpected msg state");
@@ -558,7 +560,7 @@
for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
if (prepared.find(*i) == prepared.end()) {
TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb);
+ completed(txn, dequeueXidDb, enqueueXidDb, false);
}
}
readLockedMappings(enqueueXidDb, enqueues);
@@ -758,7 +760,7 @@
if (usingJrnl()){
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue);
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}else{
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
@@ -876,7 +878,7 @@
if (usingJrnl()){
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue);
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
} else if (txn->isTPC()) {
@@ -997,7 +999,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
{
if (!txn.get()) txn.begin(env);
@@ -1022,7 +1024,7 @@
}
prepareXidDb.del(txn.get(), &key, 0);
- txn.commit();
+ txn.complete(commit);
} catch (std::exception& e) {
std::cout << "Error completing xid " << txn.getXid() << ": " << e.what() << std::endl;
txn.abort();
@@ -1061,7 +1063,7 @@
Dbt value(&dummy, sizeof(dummy));
// make sure all the data is written to disk before returning
- txn->sync();
+ txn->sync();
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
@@ -1069,14 +1071,14 @@
txn->abort();
throw e;
}
-
}
void BdbMessageStore::commit(TransactionContext& ctxt)
{
- TxnCtxt* txn(check(&ctxt));
+std::cout << " commit1" << std::flush;
+ TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
} else {
txn->commit();
}
@@ -1086,7 +1088,7 @@
{
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
} else {
txn->abort();
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-16 12:29:00 UTC (rev 1079)
@@ -100,7 +100,7 @@
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply);
+ void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
u_int64_t getRecordSize(Db& db, Dbt& key);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -58,7 +58,7 @@
if (tdl_itr->_enq_flag) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->_rid);
} else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_rid);
+ i->dequeues->add(queue_id, tdl_itr->_drid);
}
}
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-16 12:29:00 UTC (rev 1079)
@@ -43,8 +43,8 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
-private:
- typedef std::set<const qpid::broker::PersistableQueue*> ipqdef;
+protected:
+ typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
ipqdef impactedQueues; // list of Queues used in the txn
static unsigned int count;
mutable qpid::sys::Mutex Lock;
@@ -62,7 +62,7 @@
void completeTXN(bool commit){
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
DataTokenImpl* dtokp = new DataTokenImpl;
dtokp->set_rid(loggedtx->next());
@@ -70,11 +70,12 @@
if (commit)
jc->txn_commit(dtokp, getXid());
else
+ {
jc->txn_abort(dtokp, getXid());
-
+ }
} catch (rhm::journal::jexception& e) {
std::string str;
-std::cout << "Error commit" << e << std::endl;
+//std::cout << "Error commit" << e << std::endl;
delete dtokp;
THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
}
@@ -103,7 +104,7 @@
if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
allWritten = true;
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
try
{
@@ -116,8 +117,7 @@
}
}catch (rhm::journal::jexception& e) {
std::string str;
-std::cout << "Error sync" << e << std::endl;
-
+//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION("Error sync" + e.to_string(str));
}
}
@@ -134,8 +134,8 @@
virtual const std::string& getXid() { return tid; }
void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(const qpid::broker::PersistableQueue& queue){
- impactedQueues.insert(&queue); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue){
+ impactedQueues.insert(queue); }
};
@@ -146,6 +146,11 @@
TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
+ // commit the BDB abort, abort commit the jnrl
+ void commit(){ txn->commit(0); txn = 0; }
+ void abort(){ txn->abort(); txn = 0; }
+ void complete(bool commit){
+ txn->commit(0); completeTXN(commit); txn = 0; }
};
}}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -375,7 +375,7 @@
std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(), prep_txn_list.end(), *itr);
if (pitr == prep_txn_list.end())
_tmap.get_remove_tdata_list(*itr);
- }
+ }
}
}
@@ -392,22 +392,19 @@
{
case RHM_JDAT_ENQ_MAGIC:
{
-std::cout << " e" << h._rid << std::flush;
enq_rec er;
while (!done)
{
-std::cout << "*" << std::flush;
done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
rd._enq_cnt_list[fid]++;
if (er.xid_size())
{
-std::cout << "$" << std::flush;
er.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
free(xidp);
}
else
@@ -418,24 +415,21 @@
break;
case RHM_JDAT_DEQ_MAGIC:
{
-std::cout << " d" << h._rid << std::flush;
deq_rec dr;
while (!done)
{
-std::cout << "*" << std::flush;
done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
if (dr.xid_size())
{
-std::cout << "$" << std::flush;
// If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(h._rid); }
+ try { _emap.lock(dr.deq_rid()); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
dr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.deq_rid(), fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
free(xidp);
}
else
@@ -457,12 +451,10 @@
break;
case RHM_JDAT_TXA_MAGIC:
{
-std::cout << " a" << h._rid << std::flush;
txn_rec ar;
while (!done)
{
-std::cout << "*" << std::flush;
- done = ar.rcv_decode(h, ifsp, cum_size_read);
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -472,14 +464,18 @@
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- try { _emap.unlock(itr->_rid); }
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
catch(jexception e)
{
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e;
}
if (itr->_enq_flag)
- _wrfc.decr_enqcnt(itr->_fid);
+ rd._enq_cnt_list[fid]--;
}
free(xidp);
if (rd._h_rid < h._rid)
@@ -488,11 +484,9 @@
break;
case RHM_JDAT_TXC_MAGIC:
{
-std::cout << " c" << h._rid << std::flush;
txn_rec cr;
while (!done)
{
-std::cout << "*" << std::flush;
done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
@@ -500,17 +494,15 @@
cr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, cr.xid_size());
-std::cout << "@" << std::flush;
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
-std::cout << " enq_flag=" << itr->_enq_flag << std::flush;
if (itr->_enq_flag) // txn enqueue
_emap.insert_fid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
- _wrfc.decr_enqcnt(fid);
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
}
}
@@ -521,18 +513,15 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
-std::cout << " x" << std::flush;
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
-std::cout << " z" << std::flush;
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
default:
-std::cout << " ?" << std::flush;
std::stringstream ss;
ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
@@ -589,7 +578,6 @@
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
-
//kpvdr TODO -- this list needs to be mutexed...???
// need to delete the dtok's
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
@@ -601,28 +589,23 @@
data_tok*& dtokp = this_dtok_list.front();
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
- data_tok::write_state st = dtokp->wstate();
- if (st == data_tok::ENQ)
- {
-//std::cout << "----- enqueueComplete rid=" << dtokp->rid() << std::endl;
-
- dtokp->getSourceMessage()->enqueueComplete();
- /// cct --- if TPC work out what to do !!!
- }
- else if (dtokp->wstate() == data_tok::DEQ)
+ switch (dtokp->wstate())
{
-//std::cout << "----- dequeueComplete rid=" << dtokp->rid() << std::endl;
-
- dtokp->getSourceMessage()->dequeueComplete();
-
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+ break;
+ default:
+ ;
}
}
this_dtok_list.pop_front();
delete dtokp;
}
-
}
void
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -32,6 +32,7 @@
#include <jrnl/rmgr.hpp>
+#include <jrnl/jcntl.hpp>
#include <assert.h>
#include <cerrno>
#include <sstream>
@@ -287,8 +288,24 @@
}
catch (jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
throw e;
+
+ // Ok, not in emap, now search tmap for recover
+ if (_jc->is_read_only())
+ {
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end() && !is_enq; itr++)
+ {
+ txn_data_list tx_list = _tmap.get_tdata_list(*itr);
+ for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq; ditr++)
+ {
+ if (ditr->_rid == _hdr._rid)
+ is_enq = true;
+ }
+ }
+ }
//std::cout << "-nf" << std::flush;
}
#endif
@@ -299,7 +316,7 @@
// Is this locked by a pending dequeue transaction?
try
{
- if (_emap.is_locked(_hdr._rid))
+ if (_emap.is_locked(_hdr._rid) && !_jc->is_read_only())
return RHM_IORES_TXPENDING;
}
catch (jexception e)
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -36,13 +36,17 @@
#include <sstream>
#include <jrnl/jerrno.hpp>
+#include <iostream> // for debug
+
namespace rhm
{
namespace journal
{
-txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag):
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
+ const bool enq_flag):
_rid(rid),
+ _drid(drid),
_fid(fid),
_enq_flag(enq_flag),
_aio_compl(false)
@@ -139,10 +143,11 @@
ss << std::hex << "xid=\"" << xid << "\"";
throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "is_txn_synced");
}
- txn_data_list list = itr->second;
+//std::cout << " its: found XID" << std::flush;
bool is_synced = true;
- for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
+//std::cout << " rid=" << litr->_rid << " aioc=" << litr->_aio_compl << std::flush;
if (!litr->_aio_compl)
{
is_synced = false;
@@ -154,7 +159,7 @@
}
const bool
-txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception)
{
bool ok = true;
bool found = false;
@@ -164,13 +169,19 @@
ok = false;
else
{
- txn_data_list list = itr->second;
- for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+//std::cout << " sac: found XID" << std::flush;
+// txn_data_list list = itr->second;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
if (litr->_rid == rid)
{
+// txn_data_struct t(litr->_rid, litr->_drid, litr->_fid, litr->_enq_flag);
+// t._aio_compl = true;
+// itr->second.erase(litr);
+// itr->second.push_back(t);
found = true;
litr->_aio_compl = true;
+//std::cout << " rid=" << rid << " aioc=" << litr->_aio_compl << " ptr=" << std::flush;
break;
}
}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -54,10 +54,11 @@
struct txn_data_struct
{
u_int64_t _rid; ///< Record id for this operation
+ u_int64_t _drid; ///< Dequeue record id for this operation
u_int16_t _fid; ///< File id, to be used when transferring to emap on commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
bool _aio_compl; ///< Initially false, set to true when AIO returns
- txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
+ txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid, const bool enq_flag);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
@@ -82,7 +83,7 @@
const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
const bool is_txn_synced(const std::string& xid) throw (jexception);
- const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
+ const bool set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -167,7 +167,7 @@
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), true));
+ _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
}
else
_emap.insert_fid(rid, dtokp->fid());
@@ -307,10 +307,10 @@
if (xid_len) // If part of transaction, add to transaction map
{
// If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(rid); }
+ try { _emap.lock(dequeue_rid); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
std::string xid((char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(dequeue_rid, dtokp->fid(), false));
+ _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
}
else
{
@@ -444,7 +444,11 @@
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- try { _emap.unlock(itr->_rid); }
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
if (itr->_enq_flag)
_wrfc.decr_enqcnt(itr->_fid);
@@ -587,7 +591,7 @@
_emap.insert_fid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
}
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -41,7 +41,6 @@
class TwoPhaseCommitTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(TwoPhaseCommitTest);
-
CPPUNIT_TEST(testCommitSwap);
CPPUNIT_TEST(testPrepareAndAbortSwap);
CPPUNIT_TEST(testAbortNoPrepareSwap);
@@ -72,9 +71,10 @@
{
TwoPhaseCommitTest* const test;
const string messageId;
+ Message::shared_ptr msg;
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
- void init(){ test->deliver(messageId, test->queueA); }
+ void init(){ msg = test->deliver(messageId, test->queueA); }
void run(TPCTransactionContext* txn) { test->swap(txn); }
void check(bool committed) { test->swapCheck(committed, messageId); }
};
@@ -82,13 +82,16 @@
class Enqueue : public Strategy
{
TwoPhaseCommitTest* const test;
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg3;
public:
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
void run(TPCTransactionContext* txn) {
- test->enqueue(txn, "Enqueue1");
- test->enqueue(txn, "Enqueue2");
- test->enqueue(txn, "Enqueue3");
+ msg1 = test->enqueue(txn, "Enqueue1");
+ msg2 = test->enqueue(txn, "Enqueue2");
+ msg3 = test->enqueue(txn, "Enqueue3");
}
void check(bool committed) {
if (committed) {
@@ -103,12 +106,15 @@
class Dequeue : public Strategy
{
TwoPhaseCommitTest* const test;
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg3;
public:
Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {
- test->deliver("Dequeue1", test->queueA);
- test->deliver("Dequeue2", test->queueA);
- test->deliver("Dequeue3", test->queueA);
+ msg1 = test->deliver("Dequeue1", test->queueA);
+ msg2 = test->deliver("Dequeue2", test->queueA);
+ msg3 = test->deliver("Dequeue3", test->queueA);
}
void run(TPCTransactionContext* txn) {
test->dequeue(txn);
@@ -132,7 +138,10 @@
QueueRegistry queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
-
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg4;
+
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
@@ -214,7 +223,6 @@
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
swap.run(txn.get());
store->prepare(*txn);
-
restart();
//check that the message is not available from either queue
@@ -261,29 +269,31 @@
void swap(TPCTransactionContext* txn)
{
- Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
+ msg1 = queueA->dequeue().payload;//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- queueB->enqueue(txn, msg);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn, msg);
+ queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+ queueA->dequeue(txn, msg1);
}
void dequeue(TPCTransactionContext* txn)
{
- Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
- queueA->dequeue(txn, msg);
+ msg2 = queueA->dequeue().payload;//just dequeues in memory
+ queueA->dequeue(txn, msg2);
}
- void enqueue(TPCTransactionContext* txn, const string& msgid)
+ Message::shared_ptr enqueue(TPCTransactionContext* txn, const string& msgid)
{
Message::shared_ptr msg = createMessage(msgid);
queueA->enqueue(txn, msg);
+ return msg;
}
- void deliver(const string& msgid, Queue::shared_ptr& queue)
+ Message::shared_ptr deliver(const string& msgid, Queue::shared_ptr& queue)
{
- Message::shared_ptr msg = createMessage(msgid);
- queue->deliver(msg);
+ msg4 = createMessage(msgid);
+ queue->deliver(msg4);
+ return msg4;
}
void setup()
17 years, 2 months