rhmessaging commits: r1486 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-14 09:05:58 -0500 (Fri, 14 Dec 2007)
New Revision: 1486
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/jrnl/jexception.cpp
Log:
Minor bugfix for jexception, and possible fix for BZ 423981 (subject to testing)
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 22:12:30 UTC (rev 1485)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 14:05:58 UTC (rev 1486)
@@ -992,6 +992,7 @@
try {
if ( queue && usingJrnl()) {
+ qpid::sys::Mutex::ScopedLock s(jrnlWriteLock);
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
@@ -1135,7 +1136,7 @@
intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
- //std::cout << "D" << std::flush;
+ qpid::sys::Mutex::ScopedLock s(jrnlWriteLock);
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 22:12:30 UTC (rev 1485)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-14 14:05:58 UTC (rev 1486)
@@ -84,6 +84,7 @@
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
bool isInit;
+ mutable qpid::sys::Mutex jrnlWriteLock;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
Modified: store/trunk/cpp/lib/jrnl/jexception.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.cpp 2007-12-13 22:12:30 UTC (rev 1485)
+++ store/trunk/cpp/lib/jrnl/jexception.cpp 2007-12-14 14:05:58 UTC (rev 1486)
@@ -131,7 +131,7 @@
oss << " ";
}
if (tf)
- oss << _throwing_class << "() ";
+ oss << _throwing_fn << "() ";
if (tc || tf)
oss << "threw " << jerrno::err_msg(_err_code);
if (ai)
18 years, 4 months
rhmessaging commits: r1485 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-13 17:12:30 -0500 (Thu, 13 Dec 2007)
New Revision: 1485
Modified:
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/formats.py
mgmt/notes/justin-todo.txt
Log:
Renders the default exchange as "Default" instead of the empty string.
Fixes some misbehavior in fmt_olink.
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-12-13 21:13:44 UTC (rev 1484)
+++ mgmt/cumin/python/cumin/exchange.py 2007-12-13 22:12:30 UTC (rev 1485)
@@ -63,7 +63,8 @@
def render_item_link(self, session, exchange):
branch = session.branch()
self.page().show_exchange(branch, exchange).show_view(branch)
- return fmt_olink(branch, exchange)
+ name = exchange.name or "<em>Default</em>"
+ return fmt_olink(branch, exchange, name=name)
def render_item_producers(self, session, exchange):
branch = session.branch()
@@ -126,7 +127,8 @@
return exchange
def get_title(self, session, exchange):
- return "Exchange '%s'" % exchange.name
+ return exchange.name and "Exchange '%s'" % exchange.name \
+ or "Default Exchange"
class ExchangeStatus(CuminStatus):
def render_data_url(self, session, exchange):
@@ -173,13 +175,13 @@
self.tabs.add_tab(self.bindings)
def show_producers(self, session):
- self.tabs.show_mode(session, self.producers);
+ return self.tabs.show_mode(session, self.producers);
def show_bindings(self, session):
- self.tabs.show_mode(session, self.bindings);
+ return self.tabs.show_mode(session, self.bindings);
def get_title(self, session, exchange):
- return "Exchange '%s'" % exchange.name
+ return self.parent().get_title(session, exchange)
def render_data_url(self, session, exchange):
return "exchange.xml?id=%i" % exchange.id
Modified: mgmt/cumin/python/cumin/formats.py
===================================================================
--- mgmt/cumin/python/cumin/formats.py 2007-12-13 21:13:44 UTC (rev 1484)
+++ mgmt/cumin/python/cumin/formats.py 2007-12-13 22:12:30 UTC (rev 1485)
@@ -111,9 +111,10 @@
(href, class_ and " class=\"%s\" " % class_ or " ", content)
def fmt_olink(session, object, selected=False, name=None):
- n = getattr(object, "name", name)
+ if name is None:
+ name = getattr(object, "name", fmt_none())
- if isinstance(n, basestring):
- n = fmt_shorten(n)
+ if isinstance(name, basestring):
+ name = fmt_shorten(name)
- return fmt_link(session.marshal(), n, selected and "selected")
+ return fmt_link(session.marshal(), name, selected and "selected")
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-13 21:13:44 UTC (rev 1484)
+++ mgmt/notes/justin-todo.txt 2007-12-13 22:12:30 UTC (rev 1485)
@@ -10,8 +10,6 @@
* Render stats without values as something other than 0, say a --
- * Render the "" exchange as "Default"
-
* Add javascript for the check-all behavior
* Deal with problem of calling method on broker that is not there
@@ -30,6 +28,12 @@
* Group form submit has different behaviors between hitting enter and
clicking submit
+ * Fix client ajax
+
+ * Remove auth id for now
+
+ * Unit for clients should be frames vs. bytes
+
Deferred
* Add inactive state to some status lights
18 years, 4 months
rhmessaging commits: r1484 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-13 16:13:44 -0500 (Thu, 13 Dec 2007)
New Revision: 1484
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/notes/justin-todo.txt
Log:
Paginates clients.
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-12-13 21:09:41 UTC (rev 1483)
+++ mgmt/cumin/python/cumin/client.py 2007-12-13 21:13:44 UTC (rev 1484)
@@ -9,22 +9,16 @@
strings = StringCatalog(__file__)
-class ClientSet(ItemSet):
+class ClientSet(PaginatedItemSet):
def __init__(self, app, name):
super(ClientSet, self).__init__(app, name)
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
-
- self.paginator = Paginator(app, "page")
- self.add_child(self.paginator)
def get_title(self, session, vhost):
return "Clients %s" % fmt_count(self.get_item_count(session, vhost))
- def do_process(self, session, vhost):
- self.paginator.set_count(session, self.get_item_count(session, vhost))
-
def render_unit_plural(self, session, vhost):
return self.unit.get(session) == "b" and "Bytes" or "Msgs."
@@ -33,7 +27,7 @@
def do_get_items(self, session, vhost):
if vhost:
- start, end = self.paginator.get_bounds(session)
+ start, end = self.get_bounds(session)
return Client.select(Client.q.vhostID == vhost.id,
orderBy="address")[start:end]
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-13 21:09:41 UTC (rev 1483)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-13 21:13:44 UTC (rev 1484)
@@ -1,5 +1,7 @@
[ClientSet.html]
<form>
+ <div class="rfloat">{page}</div>
+
{unit}
<div class="sactions">
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-13 21:09:41 UTC (rev 1483)
+++ mgmt/notes/justin-todo.txt 2007-12-13 21:13:44 UTC (rev 1484)
@@ -14,8 +14,6 @@
* Add javascript for the check-all behavior
- * Paginate clients
-
* Deal with problem of calling method on broker that is not there
* Only put something in pending actions if the call succeeds
18 years, 4 months
rhmessaging commits: r1483 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-13 16:09:41 -0500 (Thu, 13 Dec 2007)
New Revision: 1483
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/cumin/python/cumin/formats.py
mgmt/cumin/python/cumin/model.py
mgmt/notes/justin-todo.txt
Log:
Changes to stats to reflect broker side changes.
Fixes the client page icon.
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-12-13 20:25:38 UTC (rev 1482)
+++ mgmt/cumin/python/cumin/client.py 2007-12-13 21:09:41 UTC (rev 1483)
@@ -48,17 +48,17 @@
frame.show_view(branch).show_sessions(branch)
return fmt_link(branch.marshal(), len(client.sessions))
- def render_item_produced(self, session, client):
+ def render_item_from(self, session, client):
unit = self.unit.get(session)
- key = unit == "b" and "bytesProduced" or "msgsProduced"
+ key = unit == "b" and "bytesFromClient" or "framesFromClient"
value = self.app.model.client.get_stat(key).rate(client)
- return fmt_rate(value, unit == "b" and "byte" or "msg", "sec")
+ return fmt_rate(value, unit == "b" and "byte" or "frame", "sec")
- def render_item_consumed(self, session, client):
+ def render_item_to(self, session, client):
unit = self.unit.get(session)
- key = unit == "b" and "bytesConsumed" or "msgsConsumed"
+ key = unit == "b" and "bytesToClient" or "framesToClient"
value = self.app.model.client.get_stat(key).rate(client)
- return fmt_rate(value, unit == "b" and "byte" or "msg", "sec")
+ return fmt_rate(value, unit == "b" and "byte" or "frame", "sec")
def render_item_status(self, session, client):
return fmt_ostatus(client)
@@ -115,20 +115,20 @@
return "No, Cancel"
class ClientStatus(CuminStatus):
- def render_messages_produced(self, session, client):
- stat = self.app.model.client.get_stat("msgsProduced")
- return fmt_rate(stat.rate(client), "msg", "sec")
+ def render_frames_from(self, session, client):
+ stat = self.app.model.client.get_stat("framesFromClient")
+ return fmt_rate(stat.rate(client), "frame", "sec")
- def render_messages_consumed(self, session, client):
- stat = self.app.model.client.get_stat("msgsConsumed")
- return fmt_rate(stat.rate(client), "msg", "sec")
+ def render_frames_to(self, session, client):
+ stat = self.app.model.client.get_stat("framesToClient")
+ return fmt_rate(stat.rate(client), "frame", "sec")
- def render_bytes_produced(self, session, client):
- stat = self.app.model.client.get_stat("bytesProduced")
+ def render_bytes_from(self, session, client):
+ stat = self.app.model.client.get_stat("bytesFromClient")
return fmt_rate(stat.rate(client), "byte", "sec")
- def render_bytes_consumed(self, session, client):
- stat = self.app.model.client.get_stat("bytesConsumed")
+ def render_bytes_to(self, session, client):
+ stat = self.app.model.client.get_stat("bytesToClient")
return fmt_rate(stat.rate(client), "byte", "sec")
class ClientView(Widget):
@@ -197,11 +197,11 @@
return "History"
def render_produced_chart_url(self, session, client):
- return "client.png?id=%i;s=msgsProduced;s=bytesProduced" \
+ return "client.png?id=%i;s=framesFromClient;s=bytesFromClient" \
% client.id
def render_consumed_chart_url(self, session, client):
- return "client.png?id=%i;s=msgsConsumed;s=bytesConsumed" \
+ return "client.png?id=%i;s=framesToClient;s=bytesToClient" \
% client.id
class ClientSessionSet(ItemSet):
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-13 20:25:38 UTC (rev 1482)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-13 21:09:41 UTC (rev 1483)
@@ -13,8 +13,8 @@
<th><input type="checkbox"/></th>
<th>Address</th>
<th class="ralign">Sessions</th>
- <th class="ralign">{unit_plural} Produced</th>
- <th class="ralign">{unit_plural} Consumed</th>
+ <th class="ralign">{unit_plural} Sent</th>
+ <th class="ralign">{unit_plural} Received</th>
<th>Status</th>
</tr>
@@ -27,8 +27,8 @@
<td><input type="checkbox"/></td>
<td>{item_link}</td>
<td class="ralign">{item_sessions}</td>
- <td class="ralign">{item_produced}</td>
- <td class="ralign">{item_consumed}</td>
+ <td class="ralign">{item_from}</td>
+ <td class="ralign">{item_to}</td>
<td>{item_status}</td>
</tr>
@@ -40,8 +40,8 @@
var sdata = {
"tr": [
null,
- {"td": [s.msgsProduced.rate, s.bytesProduced.rate]},
- {"td": [s.msgsConsumed.rate, s.bytesConsumed.rate]}
+ {"td": [s.framesFromClient.rate, s.bytesFromClient.rate]},
+ {"td": [s.framesToClient.rate, s.bytesToClient.rate]}
]
};
@@ -59,18 +59,18 @@
<table>
<tr>
<th></th>
- <th style="width: 35%;" class="ralign">Messages</th>
+ <th style="width: 35%;" class="ralign">Frames</th>
<th style="width: 35%;" class="ralign">Bytes</th>
</tr>
<tr>
- <th>Produced</th>
- <td class="ralign">{messages_produced}</td>
- <td class="ralign">{bytes_produced}</td>
+ <th>Sent</th>
+ <td class="ralign">{frames_from}</td>
+ <td class="ralign">{bytes_from}</td>
</tr>
<tr>
- <th>Consumed</th>
- <td class="ralign">{messages_consumed}</td>
- <td class="ralign">{bytes_consumed}</td>
+ <th>Received</th>
+ <td class="ralign">{frames_to}</td>
+ <td class="ralign">{bytes_to}</td>
</tr>
</table>
</div>
@@ -100,7 +100,7 @@
{status}
-<h1><img src="client-36.png"/>{title}</h1>
+<h1><img src="resource?name=client-36.png"/>{title}</h1>
<table class="props">
<tr><th>Address</th><td>{address}</td></tr>
Modified: mgmt/cumin/python/cumin/formats.py
===================================================================
--- mgmt/cumin/python/cumin/formats.py 2007-12-13 20:25:38 UTC (rev 1482)
+++ mgmt/cumin/python/cumin/formats.py 2007-12-13 21:09:41 UTC (rev 1483)
@@ -89,11 +89,11 @@
def fmt_ostatus(object):
errs, warns = 0, 0
- if random() < 0.075:
- errs = 1
+ #if random() < 0.075:
+ # errs = 1
- if random() < 0.10:
- warns = 1
+ #if random() < 0.10:
+ # warns = 1
return fmt_status(errs, warns)
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-12-13 20:25:38 UTC (rev 1482)
+++ mgmt/cumin/python/cumin/model.py 2007-12-13 21:09:41 UTC (rev 1483)
@@ -354,24 +354,24 @@
self.mint_stats_class = ClientStats
- stat = CuminStat(self, "bytesProduced", "int")
- stat.title = "Bytes Produced"
+ stat = CuminStat(self, "bytesFromClient", "int")
+ stat.title = "Bytes Sent"
stat.unit = "byte"
stat.categories = ("general")
- stat = CuminStat(self, "bytesConsumed", "int")
- stat.title = "Bytes Consumed"
+ stat = CuminStat(self, "bytesToClient", "int")
+ stat.title = "Bytes Received"
stat.unit = "byte"
stat.categories = ("general")
- stat = CuminStat(self, "msgsProduced", "int")
- stat.title = "Msgs. Produced"
- stat.unit = "message"
+ stat = CuminStat(self, "framesFromClient", "int")
+ stat.title = "Frames Sent"
+ stat.unit = "frame"
stat.categories = ("general")
- stat = CuminStat(self, "msgsConsumed", "int")
- stat.title = "Msgs. Produced"
- stat.unit = "message"
+ stat = CuminStat(self, "framesToClient", "int")
+ stat.title = "Frames Received"
+ stat.unit = "frame"
stat.categories = ("general")
class CuminSession(CuminClass):
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-13 20:25:38 UTC (rev 1482)
+++ mgmt/notes/justin-todo.txt 2007-12-13 21:09:41 UTC (rev 1483)
@@ -14,16 +14,8 @@
* Add javascript for the check-all behavior
- * Email amqp-list, Jonathan, and Lana with doc requirements for mgmt
-
- * Paginate producers
-
- * Paginate consumers
-
* Paginate clients
- * Paginate sessions
-
* Deal with problem of calling method on broker that is not there
* Only put something in pending actions if the call succeeds
@@ -35,17 +27,23 @@
* Need to handle exceptions in broker connect thread, so it doesn't
stop trying
- * Add inactive state to some status lights
-
* Fix session and client naming
* Group form submit has different behaviors between hitting enter and
clicking submit
- * Add client icon
-
Deferred
+ * Add inactive state to some status lights
+
+ * Paginate producers
+
+ * Paginate consumers
+
+ * Paginate sessions
+
+ * Email amqp-list, Jonathan, and Lana with doc requirements for mgmt
+
* Ask tross to take some prints out of ManagedBroker.start
* Get rid of CuminClass.mint_stats_class
18 years, 4 months
rhmessaging commits: r1482 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-13 15:25:38 -0500 (Thu, 13 Dec 2007)
New Revision: 1482
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/notes/justin-todo.txt
Log:
Adds a client close form and hooks it up.
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-12-13 20:17:34 UTC (rev 1481)
+++ mgmt/cumin/python/cumin/client.py 2007-12-13 20:25:38 UTC (rev 1482)
@@ -75,9 +75,45 @@
self.add_mode(self.view)
self.set_view_mode(self.view)
+ self.close = ClientClose(app, "close")
+ self.add_mode(self.close)
+
+ def show_close(self, session):
+ return self.show_mode(session, self.close)
+
def get_title(self, session, client):
return "Client %s" % client.address
+def doit(error, args):
+ print error, args
+ print "did it!"
+
+class ClientClose(CuminConfirmForm):
+ def get_title(self, session, client):
+ return "Close Client '%s'" % client.address
+
+ def process_cancel(self, session, client):
+ branch = session.branch()
+ self.page().show_client(branch, client).show_view(branch)
+ self.page().set_redirect_url(session, branch.marshal())
+
+ def process_submit(self, session, client):
+ print "open close"
+
+ print "client.managedBroker", client.managedBroker
+
+ client.close(self.app.model.data, client.managedBroker, doit)
+
+ print "close close"
+
+ self.process_cancel(session, client)
+
+ def render_submit_content(self, session, client):
+ return "Yes, Close Client '%s'" % client.address
+
+ def render_cancel_content(self, session, client):
+ return "No, Cancel"
+
class ClientStatus(CuminStatus):
def render_messages_produced(self, session, client):
stat = self.app.model.client.get_stat("msgsProduced")
@@ -132,6 +168,11 @@
def render_updated(self, session, client):
return fmt_datetime(client.recTime)
+ def render_close_href(self, session, client):
+ branch = session.branch()
+ self.parent().show_close(branch)
+ return branch.marshal()
+
class ClientStatistics(TabSet):
def __init__(self, app, name):
super(ClientStatistics, self).__init__(app, name)
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-13 20:17:34 UTC (rev 1481)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-13 20:25:38 UTC (rev 1482)
@@ -110,8 +110,7 @@
<tr>
<th class="actions" colspan="2">
<h2>Act on This Client:</h2>
- <a href="{href}">Detach</a>
- <a href="{href}">Close</a>
+ <a href="{close_href}">Close</a>
</th>
</tr>
</table>
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-13 20:17:34 UTC (rev 1481)
+++ mgmt/notes/justin-todo.txt 2007-12-13 20:25:38 UTC (rev 1482)
@@ -41,6 +41,8 @@
* Group form submit has different behaviors between hitting enter and
clicking submit
+
+ * Add client icon
Deferred
18 years, 4 months
rhmessaging commits: r1481 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-13 15:17:34 -0500 (Thu, 13 Dec 2007)
New Revision: 1481
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/DataTokenImpl.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/slock.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Further tidy-up: additional decoupling bewteen the journal and qpid.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -31,6 +31,8 @@
#include "BindingDbt.h"
#include "IdPairDbt.h"
#include "StringDbt.h"
+#include "JournalImpl.h"
+#include "DataTokenImpl.h"
#include <boost/intrusive_ptr.hpp>
using namespace rhm::bdbstore;
@@ -993,6 +995,7 @@
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
+ dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
bool written = false;
@@ -1137,6 +1140,7 @@
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
+ ddtokp->set_external_rid(true);
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -42,8 +42,6 @@
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_list.hpp>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
namespace rhm {
namespace bdbstore {
Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/DataTokenImpl.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -25,15 +25,24 @@
#define _DataTokenImpl_
#include "jrnl/data_tok.hpp"
+#include <boost/intrusive_ptr.hpp>
+#include <qpid/broker/PersistableMessage.h>
namespace rhm {
namespace bdbstore {
- class DataTokenImpl : public journal::data_tok
+ class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
{
+ private:
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
public:
DataTokenImpl();
virtual ~DataTokenImpl();
+
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+ { return sourceMsg; }
+ inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+ { sourceMsg = msg; }
};
} // namespace bdbstore
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -128,7 +128,7 @@
_dlen = 0;
_dtok.reset();
_dtok.set_rid(rid);
- _dtok.set_wstate(journal::data_tok::ENQ);
+ _dtok.set_wstate(DataTokenImpl::ENQ);
_external = false;
size_t xlen = 0;
bool transient = false;
@@ -307,7 +307,7 @@
jip->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
- data_tok*& dtokp = this_dtok_list.front();
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
switch (dtokp->wstate())
@@ -341,7 +341,7 @@
jip->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
- data_tok*& dtokp = this_dtok_list.front();
+ DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(this_dtok_list.front());
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-13 20:17:34 UTC (rev 1481)
@@ -26,9 +26,10 @@
#include <set>
#include "jrnl/jcntl.hpp"
-#include "jrnl/data_tok.hpp"
+#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/Timer.h>
+#include <qpid/broker/PersistableQueue.h>
#include <qpid/sys/Time.h>
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -62,7 +63,7 @@
inline void cancel() { parent=0; }
};
- class JournalImpl : public journal::jcntl
+ class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
{
private:
static qpid::broker::Timer journalTimer;
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -53,7 +53,7 @@
_dblks_read(0),
_rid(0),
_xid(),
- _sourceMsg(NULL)
+ _external_rid(false)
{
pthread_mutex_init(&_mutex, NULL);
pthread_mutex_lock(&_mutex);
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -41,18 +41,8 @@
}
}
-namespace qpid
-{
-namespace broker
-{
-class PersistableMessage;
-}
-}
-
-#include <boost/intrusive_ptr.hpp>
#include <pthread.h>
-#include <qpid/broker/PersistableMessage.h>
-#include <qpid/RefCounted.h>
+#include <string>
#include <sys/types.h>
namespace rhm
@@ -66,7 +56,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
- class data_tok : public qpid::RefCounted
+ class data_tok// : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
@@ -101,7 +91,7 @@
READ ///< Data block is fully read
};
- private:
+ protected:
pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
@@ -114,17 +104,12 @@
u_int64_t _rid; ///< RID of data set by enqueue operation
std::string _xid; ///< XID set by enqueue operation
u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
- boost::intrusive_ptr<qpid::broker::PersistableMessage> _sourceMsg; ///< Pointer back to source Message in Broker
+ bool _external_rid; ///< Flag to indicate external setting of rid
public:
data_tok();
virtual ~data_tok();
- inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
- { return _sourceMsg; }
- inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
- { _sourceMsg = msg; }
-
inline const u_int64_t id() const { return _icnt; }
inline const write_state wstate() const { return _wstate; }
const char* wstate_str() const;
@@ -157,6 +142,8 @@
inline void set_rid(const u_int64_t rid) { _rid = rid; }
inline const u_int64_t dequeue_rid() const {return _dequeue_rid; }
inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
+ inline const bool external_rid() const { return _external_rid; }
+ inline void set_external_rid(const bool external_rid) { _external_rid = external_rid; }
inline const bool has_xid() const { return !_xid.empty(); }
inline const std::string& xid() const { return _xid; }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,9 +34,11 @@
#include <jrnl/jcntl.hpp>
#include <algorithm>
+#include <assert.h>
#include <cerrno>
#include <fstream>
#include <iomanip>
+#include <iostream>
#include <jrnl/file_hdr.hpp>
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -65,7 +65,7 @@
* which is used per data block written to the journal, and is used to track its status through
* the AIO enqueue, read and dequeue process.
*/
- class jcntl : public qpid::broker::ExternalQueueStore
+ class jcntl
{
protected:
/**
Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -34,6 +34,7 @@
#define rhm_journal_slock_hpp
#include <pthread.h>
+#include <errno.h>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -141,7 +141,8 @@
else
_enq_busy = true;
- u_int64_t rid = initialize_rid(cont, dtokp);
+ u_int64_t rid = dtokp->external_rid() ? dtokp->rid() :
+ (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
external);
if (!cont)
@@ -280,13 +281,10 @@
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
}
- // TODO: Tidy this up!
-// u_int64_t rid = initialize_rid(cont, dtokp);
-// _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
- u_int64_t rid = dtokp->getSourceMessage() ? dtokp->rid() :
- (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
- u_int64_t dequeue_rid = dtokp->getSourceMessage() ? dtokp->dequeue_rid() : dtokp->rid();
- if (!dtokp->getSourceMessage())
+ const bool ext_rid = dtokp->external_rid();
+ u_int64_t rid = ext_rid ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
+ u_int64_t dequeue_rid = ext_rid ? dtokp->dequeue_rid() : dtokp->rid();
+ if (!ext_rid)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(dequeue_rid);
@@ -991,18 +989,6 @@
return RHM_IORES_SUCCESS;
}
-const u_int64_t
-wmgr::initialize_rid(const bool cont, data_tok* dtokp)
-{
- if (dtokp->getSourceMessage())
- {
- u_int64_t rid = dtokp->rid();
- assert(rid != 0);
- return rid;
- }
- return cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
-}
-
void
wmgr::dblk_roundup()
{
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-13 19:24:38 UTC (rev 1480)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-13 20:17:34 UTC (rev 1481)
@@ -121,7 +121,6 @@
const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
const;
- const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
18 years, 4 months
rhmessaging commits: r1480 - in mgmt: mint/python/mint and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-13 14:24:38 -0500 (Thu, 13 Dec 2007)
New Revision: 1480
Modified:
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/brokergroup.py
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/formats.py
mgmt/cumin/python/cumin/queue.py
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.py
mgmt/mint/python/mint/schema.sql
mgmt/notes/justin-todo.txt
Log:
Fixes group filtering in the broker browser.
Handles duplicate group-broker mappings correctly.
Updates schema.
Raises the max length before fmt_shorten shortens strings.
Fixes the pending actions placeholder in client.strings.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/broker.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -2,6 +2,7 @@
from wooly import *
from wooly.widgets import *
from random import random
+from psycopg2 import IntegrityError
from configproperty import *
from virtualhost import *
@@ -15,7 +16,7 @@
strings = StringCatalog(__file__)
-class BrokerSetForm(SQLObjectItemSet, Form):
+class BrokerSetForm(PaginatedItemSet, Form):
def __init__(self, app, name):
super(BrokerSetForm, self).__init__(app, name)
@@ -37,9 +38,6 @@
self.submit = BooleanParameter(app, "submit")
self.add_parameter(self.submit)
self.add_form_parameter(self.submit)
-
- self.paginator = Paginator(app, "page")
- self.add_child(self.paginator)
def get_title(self, session, model):
return "Brokers %s" % fmt_count(self.get_item_count(session, model))
@@ -48,10 +46,12 @@
return BrokerRegistration.select().count()
def do_get_items(self, session, model):
- start, end = self.paginator.get_bounds(session)
+ start, end = self.get_bounds(session)
return BrokerRegistration.select(orderBy="name")[start:end]
def do_process(self, session, model):
+ super(BrokerSetForm, self).do_process(session, model)
+
if self.submit.get(session):
self.submit.set(session, False)
@@ -69,12 +69,12 @@
if group:
for broker in brokers:
- broker.addBrokerGroup(group)
+ try:
+ broker.addBrokerGroup(group)
+ except IntegrityError:
+ pass
self.page().set_redirect_url(session, session.marshal())
- else:
- count = self.get_item_count(session, model)
- self.paginator.set_count(session, count)
def render_action_param_name(self, session, broker):
return self.action.path()
@@ -464,11 +464,23 @@
profile = self.parent().profile.get(session)
cluster = self.parent().cluster.get(session)
- brokers = BrokerRegistration.selectBy \
- (profile = profile, cluster = cluster)
+ brokers = BrokerRegistration.select()
- # XXX add group filtering
+ if profile:
+ brokers = brokers.filter \
+ (BrokerRegistration.q.profileID == profile.id)
+ if cluster:
+ brokers = brokers.filter \
+ (BrokerRegistration.q.clusterID == cluster.id)
+
+ if group:
+ brokers = brokers.filter \
+ (BrokerGroupMapping.q.brokerRegistrationID \
+ == BrokerRegistration.q.id)
+ brokers = brokers.filter \
+ (BrokerGroupMapping.q.brokerGroupID == group.id)
+
return brokers
def render_none(self, session, model):
Modified: mgmt/cumin/python/cumin/brokergroup.py
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/brokergroup.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -28,12 +28,12 @@
return fmt_olink(branch, group)
def render_item_config(self, session, group):
- return len(group.brokers)
+ return group.brokers.count()
def render_item_status(self, session, group):
writer = Writer()
- for broker in sorted_by(group.brokers):
+ for broker in group.brokers:
writer.write(fmt_ostatus(broker))
return writer.to_string()
@@ -87,8 +87,12 @@
class GroupBrokerTab(BrokerSetForm):
def get_title(self, session, group):
- return "Brokers %s" % fmt_count(len(group.brokers))
+ return "Brokers %s" % \
+ fmt_count(self.get_item_count(session, group))
+ def get_item_count(self, session, group):
+ return group.brokers.count()
+
def do_get_items(self, session, group):
return group.brokers
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/client.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -35,12 +35,12 @@
if vhost:
start, end = self.paginator.get_bounds(session)
return Client.select(Client.q.vhostID == vhost.id,
- orderBy="ipAddr")[start:end]
+ orderBy="address")[start:end]
def render_item_link(self, session, client):
branch = session.branch()
self.page().show_client(branch, client).show_view(branch)
- return fmt_olink(branch, client, name=client.ipAddr)
+ return fmt_olink(branch, client, name=client.address)
def render_item_sessions(self, session, client):
branch = session.branch()
@@ -76,7 +76,7 @@
self.set_view_mode(self.view)
def get_title(self, session, client):
- return "Client %s" % client.ipAddr
+ return "Client %s" % client.address
class ClientStatus(CuminStatus):
def render_messages_produced(self, session, client):
@@ -114,13 +114,13 @@
return self.tabs.show_mode(session, self.sessions)
def get_title(self, session, client):
- return "Client '%s'" % client.ipAddr
+ return "Client '%s'" % client.address
def render_data_url(self, session, client):
return "client.xml?id=%i" % client.id
def render_address(self, session, client):
- return client.ipAddr
+ return client.address
def render_auth_id(self, session, client):
return "e50e7dcaa8d6a039a" #XXX get rid of this
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-13 19:24:38 UTC (rev 1480)
@@ -54,7 +54,7 @@
<!-- <div>{status_info}</div> -->
- <div>{pending_actions}</div>
+ <div>{actions_pending}</div>
<table>
<tr>
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/exchange.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -370,12 +370,15 @@
% queue.id
class ExchangeProducerSet(ItemSet):
- def get_title(self, session, queue):
- return "Producers %s" % fmt_count(len(queue.producers))
+ def get_title(self, session, exchange):
+ return "Producers %s" % fmt_count(len(exchange.producers))
- def do_get_items(self, session, queue):
- return sorted_by(queue.producers)
+ def get_item_count(self, session, exchange):
+ return Producer.select(Producer.q.exchangeID == exchange.id).count()
+ def do_get_items(self, session, exchange):
+ return Producer.select(Producer.q.exchangeID == exchange.id)
+
def render_item_name(self, session, producer):
return producer.name
Modified: mgmt/cumin/python/cumin/formats.py
===================================================================
--- mgmt/cumin/python/cumin/formats.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/formats.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -101,8 +101,8 @@
return "<span class=\"none\">None</span>"
def fmt_shorten(string):
- if len(string) > 15:
- return string[:10] + "..." + string[-5:]
+ if len(string) > 20:
+ return string[:13] + "..." + string[-6:]
else:
return string
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/cumin/python/cumin/queue.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -479,32 +479,39 @@
return "queue.png?id=%i;s=enqueueTxnCount;s=dequeueTxnCount" \
% queue.id
-class QueueConsumerSet(ItemSet):
+class QueueConsumerSet(PaginatedItemSet):
def get_title(self, session, queue):
- return "Consumers %s" % fmt_count(len(queue.consumers))
+ return "Consumers %s" % fmt_count(self.get_item_count(session, queue))
+ def get_item_count(self, session, queue):
+ return Consumer.select(Consumer.q.queueID == queue.id).count()
+
def do_get_items(self, session, queue):
- return queue.consumers
+ start, end = self.get_bounds(session)
+ return Consumer.select(Consumer.q.queueID == queue.id)[start:end]
def render_item_name(self, session, consumer):
return consumer.name
def render_item_messages_consumed(self, session, consumer):
- return consumer.mintConsumerStats.msgsConsumed
+ stat = self.app.model.consumer.get_stat("msgsConsumed")
+ return stat.value(consumer)
def render_item_messages_consumed_rate(self, session, consumer):
- value = consumer.mintConsumerStats.msgsConsumed
- return fmt_rate(value, "msg", "sec")
+ stat = self.app.model.consumer.get_stat("msgsConsumed")
+ return fmt_rate(stat.rate(consumer), "msg", "sec")
def render_item_bytes_consumed(self, session, consumer):
- return consumer.mintConsumerStats.bytesConsumed
+ stat = self.app.model.consumer.get_stat("bytesConsumed")
+ return stat.value(consumer)
def render_item_bytes_consumed_rate(self, session, consumer):
- value = consumer.mintConsumerStats.bytesConsumed
- return fmt_rate(value, "byte", "sec")
+ stat = self.app.model.consumer.get_stat("bytesConsumed")
+ return fmt_rate(stat.rate(consumer), "byte", "sec")
def render_item_unacked_messages(self, session, consumer):
- return consumer.mintConsumerStats.unackedMessages
+ stat = self.app.model.consumer.get_stat("unackedMessages")
+ return stat.value(consumer)
class QueueXmlPage(CuminXmlPage):
def __init__(self, app, name):
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/mint/python/mint/__init__.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -25,14 +25,24 @@
host = StringCol(length=1000, default=None)
port = SmallIntCol(default=None)
broker = ForeignKey("Broker", cascade="null", default=None)
- groups = RelatedJoin("BrokerGroup")
+ groups = SQLRelatedJoin("BrokerGroup",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
profile = ForeignKey("BrokerProfile", cascade="null", default=None)
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
- brokers = RelatedJoin("BrokerRegistration")
+ brokers = SQLRelatedJoin("BrokerRegistration",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+class BrokerGroupMapping(SQLObject):
+ brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
+ cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
+
class BrokerCluster(SQLObject):
name = StringCol(length=1000, default=None)
brokers = MultipleJoin("BrokerRegistration", joinColumn="cluster_id")
Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/mint/python/mint/schema.py 2007-12-13 19:24:38 UTC (rev 1480)
@@ -62,13 +62,6 @@
model.getConnectedBroker(managedBrokerLabel).method(methodId, self.idOriginal, \
classToSchemaNameMap[self.__class__.__name__], "echo", args=actualArgs, packageName="qpid")
- def crash(self, model, managedBrokerLabel, callbackMethod):
- """Temporary test method to crash the broker"""
- actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.getConnectedBroker(managedBrokerLabel).method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "crash", args=actualArgs, packageName="qpid")
-
System.sqlmeta.addJoin(MultipleJoin('Broker', joinMethodName='brokers'))
@@ -115,7 +108,6 @@
durable = BoolCol(default=None)
autoDelete = BoolCol(default=None)
exclusive = BoolCol(default=None)
- pageMemoryLimit = IntCol(default=None)
def purge(self, model, managedBrokerLabel, callbackMethod):
"""Discard all messages on queue"""
@@ -124,14 +116,6 @@
model.getConnectedBroker(managedBrokerLabel).method(methodId, self.idOriginal, \
classToSchemaNameMap[self.__class__.__name__], "purge", args=actualArgs, packageName="qpid")
- def increaseJournalSize(self, model, managedBrokerLabel, callbackMethod, pages):
- """Increase number of disk pages allocated for this queue"""
- actualArgs = dict()
- actualArgs["pages"] = pages
- methodId = model.registerCallback(callbackMethod)
- model.getConnectedBroker(managedBrokerLabel).method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "increaseJournalSize", args=actualArgs, packageName="qpid")
-
Vhost.sqlmeta.addJoin(MultipleJoin('Queue', joinMethodName='queues'))
@@ -139,34 +123,6 @@
idOriginal = BigIntCol(default=None)
recTime = TimestampCol(default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
- journalLocation = StringCol(length=1000, default=None)
- journalBaseFileName = StringCol(length=1000, default=None)
- journalInitialFileCount = IntCol(default=None)
- journalCurrentFileCount = IntCol(default=None)
- journalDataFileSize = IntCol(default=None)
- journalFreeFileCount = IntCol(default=None)
- journalFreeFileCountLow = IntCol(default=None)
- journalFreeFileCountHigh = IntCol(default=None)
- journalAvailableFileCount = IntCol(default=None)
- journalAvailableFileCountLow = IntCol(default=None)
- journalAvailableFileCountHigh = IntCol(default=None)
- journalRecordDepth = IntCol(default=None)
- journalRecordDepthLow = IntCol(default=None)
- journalRecordDepthHigh = IntCol(default=None)
- journalRecordEnqueues = BigIntCol(default=None)
- journalRecordDequeues = BigIntCol(default=None)
- journalWriteWaitFailures = BigIntCol(default=None)
- journalWriteBusyFailures = BigIntCol(default=None)
- journalReadRecordCount = BigIntCol(default=None)
- journalReadBusyFailures = BigIntCol(default=None)
- journalWritePageCacheDepth = IntCol(default=None)
- journalWritePageCacheDepthLow = IntCol(default=None)
- journalWritePageCacheDepthHigh = IntCol(default=None)
- journalWritePageSize = IntCol(default=None)
- journalReadPageCacheDepth = IntCol(default=None)
- journalReadPageCacheDepthLow = IntCol(default=None)
- journalReadPageCacheDepthHigh = IntCol(default=None)
- journalReadPageSize = IntCol(default=None)
msgTotalEnqueues = BigIntCol(default=None)
msgTotalDequeues = BigIntCol(default=None)
msgTxnEnqueues = BigIntCol(default=None)
@@ -206,6 +162,9 @@
unackedMessages = IntCol(default=None)
unackedMessagesLow = IntCol(default=None)
unackedMessagesHigh = IntCol(default=None)
+ messageLatencyMin = BigIntCol(default=None)
+ messageLatencyMax = BigIntCol(default=None)
+ messageLatencyAvg = BigIntCol(default=None)
Queue.sqlmeta.addJoin(MultipleJoin('QueueStats', joinMethodName='stats'))
@@ -253,15 +212,15 @@
managedBroker = StringCol(length=1000, default=None)
statsCurr = ForeignKey('BindingStats', cascade='null', default=None)
statsPrev = ForeignKey('BindingStats', cascade='null', default=None)
- queue = ForeignKey('Queue', cascade='null', default=None)
exchange = ForeignKey('Exchange', cascade='null', default=None)
+ queue = ForeignKey('Queue', cascade='null', default=None)
bindingKey = StringCol(length=1000, default=None)
-Queue.sqlmeta.addJoin(MultipleJoin('Binding', joinMethodName='bindings'))
-
Exchange.sqlmeta.addJoin(MultipleJoin('Binding', joinMethodName='bindings'))
+Queue.sqlmeta.addJoin(MultipleJoin('Binding', joinMethodName='bindings'))
+
class BindingStats(SQLObject):
idOriginal = BigIntCol(default=None)
recTime = TimestampCol(default=None)
@@ -280,8 +239,7 @@
statsCurr = ForeignKey('ClientStats', cascade='null', default=None)
statsPrev = ForeignKey('ClientStats', cascade='null', default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
- ipAddr = IntCol(default=None)
- port = SmallIntCol(default=None)
+ address = StringCol(length=1000, default=None)
def close(self, model, managedBrokerLabel, callbackMethod):
actualArgs = dict()
@@ -303,9 +261,9 @@
recTime = TimestampCol(default=None)
client = ForeignKey('Client', cascade='null', default=None)
authIdentity = StringCol(length=1000, default=None)
- msgsProduced = BigIntCol(default=None)
+ framesFromClient = BigIntCol(default=None)
msgsConsumed = BigIntCol(default=None)
- bytesProduced = BigIntCol(default=None)
+ bytesFromClient = BigIntCol(default=None)
bytesConsumed = BigIntCol(default=None)
Client.sqlmeta.addJoin(MultipleJoin('ClientStats', joinMethodName='stats'))
Modified: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/mint/python/mint/schema.sql 2007-12-13 19:24:38 UTC (rev 1480)
@@ -7,10 +7,13 @@
id SERIAL PRIMARY KEY,
name VARCHAR(1000)
);
-CREATE TABLE broker_group_broker_registration (
-broker_group_id INT NOT NULL,
-broker_registration_id INT NOT NULL
+
+CREATE TABLE broker_group_mapping (
+ id SERIAL PRIMARY KEY,
+ broker_registration_id INT NOT NULL,
+ broker_group_id INT NOT NULL
);
+CREATE UNIQUE INDEX broker_group_mapping_unique ON broker_group_mapping (broker_registration_id, broker_group_id);
CREATE TABLE broker_profile (
id SERIAL PRIMARY KEY,
@@ -31,8 +34,7 @@
id SERIAL PRIMARY KEY,
name VARCHAR(1000),
value VARCHAR(1000),
- type VARCHAR(1),
- profile_id INT
+ type VARCHAR(1)
);
CREATE TABLE binding (
@@ -44,8 +46,8 @@
managed_broker VARCHAR(1000),
stats_curr_id INT,
stats_prev_id INT,
- queue_id INT,
exchange_id INT,
+ queue_id INT,
binding_key VARCHAR(1000)
);
@@ -99,8 +101,7 @@
stats_curr_id INT,
stats_prev_id INT,
vhost_id INT,
- ip_addr INT,
- port SMALLINT
+ address VARCHAR(1000)
);
CREATE TABLE client_stats (
@@ -109,9 +110,9 @@
rec_time TIMESTAMP,
client_id INT,
auth_identity VARCHAR(1000),
- msgs_produced BIGINT,
+ frames_from_client BIGINT,
msgs_consumed BIGINT,
- bytes_produced BIGINT,
+ bytes_from_client BIGINT,
bytes_consumed BIGINT
);
@@ -233,8 +234,7 @@
name VARCHAR(1000),
durable BOOL,
auto_delete BOOL,
- exclusive BOOL,
- page_memory_limit INT
+ exclusive BOOL
);
CREATE TABLE queue_stats (
@@ -242,34 +242,6 @@
id_original BIGINT,
rec_time TIMESTAMP,
queue_id INT,
- journal_location VARCHAR(1000),
- journal_base_file_name VARCHAR(1000),
- journal_initial_file_count INT,
- journal_current_file_count INT,
- journal_data_file_size INT,
- journal_free_file_count INT,
- journal_free_file_count_low INT,
- journal_free_file_count_high INT,
- journal_available_file_count INT,
- journal_available_file_count_low INT,
- journal_available_file_count_high INT,
- journal_record_depth INT,
- journal_record_depth_low INT,
- journal_record_depth_high INT,
- journal_record_enqueues BIGINT,
- journal_record_dequeues BIGINT,
- journal_write_wait_failures BIGINT,
- journal_write_busy_failures BIGINT,
- journal_read_record_count BIGINT,
- journal_read_busy_failures BIGINT,
- journal_write_page_cache_depth INT,
- journal_write_page_cache_depth_low INT,
- journal_write_page_cache_depth_high INT,
- journal_write_page_size INT,
- journal_read_page_cache_depth INT,
- journal_read_page_cache_depth_low INT,
- journal_read_page_cache_depth_high INT,
- journal_read_page_size INT,
msg_total_enqueues BIGINT,
msg_total_dequeues BIGINT,
msg_txn_enqueues BIGINT,
@@ -308,7 +280,10 @@
bindings_high INT,
unacked_messages INT,
unacked_messages_low INT,
- unacked_messages_high INT
+ unacked_messages_high INT,
+ message_latency_min BIGINT,
+ message_latency_max BIGINT,
+ message_latency_avg BIGINT
);
CREATE TABLE session (
@@ -375,22 +350,24 @@
vhost_id INT
);
+ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_registration_id_exists FOREIGN KEY (broker_registration_id) REFERENCES broker_registration (id) ON DELETE CASCADE;
+
+ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_group_id_exists FOREIGN KEY (broker_group_id) REFERENCES broker_group (id) ON DELETE CASCADE;
+
ALTER TABLE broker_registration ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE broker_registration ADD CONSTRAINT cluster_id_exists FOREIGN KEY (cluster_id) REFERENCES broker_cluster (id) ON DELETE SET NULL;
ALTER TABLE broker_registration ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
-ALTER TABLE config_property ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
-
ALTER TABLE binding ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
ALTER TABLE binding ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
-ALTER TABLE binding ADD CONSTRAINT queue_id_exists FOREIGN KEY (queue_id) REFERENCES queue (id) ON DELETE SET NULL;
-
ALTER TABLE binding ADD CONSTRAINT exchange_id_exists FOREIGN KEY (exchange_id) REFERENCES exchange (id) ON DELETE SET NULL;
+ALTER TABLE binding ADD CONSTRAINT queue_id_exists FOREIGN KEY (queue_id) REFERENCES queue (id) ON DELETE SET NULL;
+
ALTER TABLE binding_stats ADD CONSTRAINT binding_id_exists FOREIGN KEY (binding_id) REFERENCES binding (id) ON DELETE SET NULL;
ALTER TABLE broker ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES broker_stats (id) ON DELETE SET NULL;
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-13 18:09:05 UTC (rev 1479)
+++ mgmt/notes/justin-todo.txt 2007-12-13 19:24:38 UTC (rev 1480)
@@ -4,8 +4,6 @@
* Add legends to charts
- * Make group slider in broker browser work
-
* Add a groups column to the browser broker list
* Sort in tables
@@ -26,8 +24,6 @@
* Paginate sessions
- * Prevent users from mapping a broker to a group more than once
-
* Deal with problem of calling method on broker that is not there
* Only put something in pending actions if the call succeeds
@@ -39,10 +35,6 @@
* Need to handle exceptions in broker connect thread, so it doesn't
stop trying
- * Ask tross to take some prints out of ManagedBroker.start
-
- * Get rid of CuminClass.mint_stats_class
-
* Add inactive state to some status lights
* Fix session and client naming
@@ -52,6 +44,10 @@
Deferred
+ * Ask tross to take some prints out of ManagedBroker.start
+
+ * Get rid of CuminClass.mint_stats_class
+
* Add a do_get_item_count, and cache result for use by get_item_count
* Add a ~3 second (or use broker update interval, if we can get that)
18 years, 4 months
rhmessaging commits: r1479 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-12-13 13:09:05 -0500 (Thu, 13 Dec 2007)
New Revision: 1479
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Allow getRecordSize() to be called within a txn to avoid deadlocks
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 17:55:25 UTC (rev 1478)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 18:09:05 UTC (rev 1479)
@@ -644,7 +644,7 @@
msg->setPersistenceId(key.id);
u_int32_t contentOffset = headerSize + preamble_length;
- u_int64_t contentSize = getRecordSize(messageDb, key) - contentOffset;
+ u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) - contentOffset;
if (msg->loadContent(contentSize)) {
//now read the content
BufferValue content(contentSize, contentOffset);
@@ -805,13 +805,19 @@
}
}
+
u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
{
+ return getRecordSize(0, db, key);
+}
+
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
peek.set_ulen(0);
try {
- int status = db.get(0, &key, &peek, 0);
+ int status = db.get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 17:55:25 UTC (rev 1478)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 18:09:05 UTC (rev 1479)
@@ -126,6 +126,7 @@
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
u_int64_t getRecordSize(Db& db, Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
18 years, 4 months
rhmessaging commits: r1478 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-12-13 12:55:25 -0500 (Thu, 13 Dec 2007)
New Revision: 1478
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Put bdb ops for loadContent in a transaction to avoid deadlocking
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 17:19:52 UTC (rev 1477)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 17:55:25 UTC (rev 1478)
@@ -865,6 +865,13 @@
return;
}
}
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+ ": loadContent() failed: " + e.what());
+ }
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
Dbt value(buffer, length);
@@ -872,20 +879,19 @@
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb.get(0, &key, &value, 0);
+ int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
+ txn.abort();
delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
+ txn.commit();
data.assign(buffer, value.get_size());
delete [] buffer;
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error loading content", e);
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": loadContent() failed: " + e.what());
- }
+ }
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
18 years, 4 months
rhmessaging commits: r1477 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-13 12:19:52 -0500 (Thu, 13 Dec 2007)
New Revision: 1477
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
Log:
Code tidy-up: separation of qpid dependencies from class jcntl by moving them to subclass JournalImpl.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -296,3 +296,62 @@
return r;
}
+void
+JournalImpl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+ JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- this list needs to be mutexed...???
+ std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_wr_cmpl_dtok_list.begin(),
+ jip->_aio_wr_cmpl_dtok_list.end());
+
+ jip->_aio_wr_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
+ this_dtok_list.pop_front();
+ }
+}
+
+void
+JournalImpl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+ JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- can we get rid of the copy???
+ std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_rd_cmpl_dtok_list.begin(),
+ jip->_aio_rd_cmpl_dtok_list.end());
+ jip->_aio_rd_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
+ // cct call the recovery manager. / lazyload..
+ }
+ }
+ dtokp->release();
+ this_dtok_list.pop_front();
+ }
+
+}
+
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-13 17:19:52 UTC (rev 1477)
@@ -81,6 +81,9 @@
journal::data_tok _dtok;
bool _external;
+ std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
+ std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
+
public:
JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -91,14 +94,18 @@
const qpid::sys::Duration flushTimeout);
virtual ~JournalImpl();
+ inline void initialize() {
+ jcntl::initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
+ &aio_wr_callback );
+ }
+
void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
u_int64_t& highest_rid, u_int64_t queue_id);
- void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id)
- {
+ inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t& highest_rid, u_int64_t queue_id) {
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
}
@@ -144,6 +151,8 @@
private:
const journal::iores handleInactivityTimer(const journal::iores r);
+ static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+ static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
}; // class JournalImpl
} // namespace bdbstore
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -41,7 +41,6 @@
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
#include <sstream>
-#include <qpid/broker/PersistableMessage.h>
#include <unistd.h>
namespace rhm
@@ -738,64 +737,5 @@
}
}
-
-void
-jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
-{
-//kpvdr TODO -- this list needs to be mutexed...???
- std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
-
- journal->_aio_wr_cmpl_dtok_list.clear();
- for (u_int32_t i=0; i<num_dtoks; i++)
- {
- data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
-*/
- break;
- default:
- ;
- }
- }
- dtokp->release();
- this_dtok_list.pop_front();
- }
-}
-
-void
-jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
-{
-
-//kpvdr TODO -- can we get rid of the copy???
- std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
- journal->_aio_rd_cmpl_dtok_list.clear();
- for (u_int32_t i=0; i<num_dtoks; i++)
- {
- data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
- // cct call the recovery manager. / lazyload..
- }
- }
- dtokp->release();
- this_dtok_list.pop_front();
- }
-
-}
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -49,7 +49,6 @@
#include <jrnl/slock.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
-#include <qpid/broker/PersistableQueue.h>
namespace rhm
{
@@ -143,9 +142,6 @@
rcvdat _rcvdat; ///< Recovery data used for recovery
pthread_mutex_t _mutex; ///< Mutex for thread safety
- std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
- std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
-
public:
/**
* \brief Journal constructor.
@@ -193,19 +189,6 @@
std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb);
/**
- * \brief Initialize using internal default callbacks and data_tok lists.
- *
- * TODO: Move to JournalImpl later
- *
- * \exception TODO
- */
- void initialize()
- {
- initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback );
- }
-
- /**
* /brief Initialize journal by recovering state from previously written journal.
*
* Initialize journal by recovering state from previously written journal. The journal files
@@ -235,19 +218,6 @@
const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
/**
- * \brief Recover using internal default callbacks and data_tok lists.
- *
- * TODO: Move to JournalImpl later
- *
- * \exception TODO
- */
- void recover(const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- {
- recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_txn_list, highest_rid);
- }
-
- /**
* \brief Notification to the journal that recovery is complete and that normal operation
* may resume.
*
@@ -656,16 +626,6 @@
std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
- /**
- * Intenal callback write
- */
- static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
-
- /**
- * Intenal callback write
- */
- static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
};
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -30,9 +30,6 @@
*/
#include "JournalSystemTests.hpp"
-//#include "msg_producer.hpp"
-//#include "msg_consumer.hpp"
-#include <vector>
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
@@ -67,7 +64,7 @@
{
char* test_name = "InitializationTest";
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
}
catch (const rhm::journal::jexception& e)
{
@@ -87,15 +84,15 @@
char* test_name = "EmptyRecoverTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
jc.recover_complete();
}
}
@@ -114,7 +111,7 @@
{
char* test_name = "EnqueueTest";
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Non-txn
for (int m=0; m<NUM_MSGS; m++)
@@ -145,13 +142,13 @@
char* test_name = "RecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -165,7 +162,7 @@
test_name = "TxnRecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 1, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -174,7 +171,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -203,13 +200,13 @@
char* test_name = "RecoveredReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -231,7 +228,7 @@
test_name = "TxnRecoveredReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 2, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -240,7 +237,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -277,13 +274,13 @@
char* test_name = "RecoveredDequeueTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -307,7 +304,7 @@
test_name = "TxnRecoveredDequeueTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 3, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -316,7 +313,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -355,7 +352,7 @@
char* test_name = "FlagsRecoverdTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Transient msgs - should not recover
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
@@ -371,7 +368,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -424,7 +421,7 @@
test_name = "TxnFlagsRecoverdTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 4, XID_SIZE);
txn_list.push_back(xid);
// Transient msgs - should not recover
@@ -443,7 +440,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -511,7 +508,7 @@
char* test_name = "ComplexRecoveryTest1";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
// rids: 0 to NUM_MSGS*2 - 1
@@ -531,7 +528,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -568,7 +565,7 @@
test_name = "TxnComplexRecoveryTest1";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
// rids: 0 to NUM_MSGS - 1
@@ -595,7 +592,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -648,6 +645,19 @@
// === Private helper functions ===
void
+JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
+{
+ jc->initialize(&aioRdCmplList, NULL, &aioWrCmplList, NULL);
+}
+
+void
+JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
+ u_int64_t& highest_rid)
+{
+ jc->recover(&aioRdCmplList, NULL, &aioWrCmplList, NULL, txn_list, highest_rid);
+}
+
+void
JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool transient)
{
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -32,6 +32,7 @@
#include "../test_plugin.h"
#include <jrnl/jcntl.hpp>
+#include <vector>
class JournalSystemTests : public CppUnit::TestCase
{
@@ -55,6 +56,8 @@
size_t xidsize;
bool transientFlag;
bool externalFlag;
+ std::deque<rhm::journal::data_tok*> aioRdCmplList;
+ std::deque<rhm::journal::data_tok*> aioWrCmplList;
public:
void InstantiationTest();
@@ -68,6 +71,9 @@
void ComplexRecoveryTest1();
private:
+ void jrnl_init(rhm::journal::jcntl* jc);
+ void jrnl_recover(rhm::journal::jcntl* jc, std::vector<std::string> txn_list,
+ u_int64_t& highest_rid);
void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient);
void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient);
void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,
18 years, 4 months