rhmessaging commits: r4031 - mgmt/newdata/wooly/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-06-16 15:08:47 -0400 (Wed, 16 Jun 2010)
New Revision: 4031
Modified:
mgmt/newdata/wooly/python/wooly/forms.py
Log:
Moved call to Super to end of do_process method for SubmitForms so submit and cancel can be evaluated first
Modified: mgmt/newdata/wooly/python/wooly/forms.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/forms.py 2010-06-16 19:06:24 UTC (rev 4030)
+++ mgmt/newdata/wooly/python/wooly/forms.py 2010-06-16 19:08:47 UTC (rev 4031)
@@ -588,8 +588,6 @@
self.cancel_button.set(session, True)
def do_process(self, session):
- super(SubmitForm, self).do_process(session)
-
if self.cancel_button.get(session):
self.cancel_button.set(session, False)
@@ -601,6 +599,8 @@
else:
self.process_display(session)
+ super(SubmitForm, self).do_process(session)
+
# XXX get rid of this?
def process_return(self, session):
url = self.return_url.get(session)
14 years, 6 months
rhmessaging commits: r4030 - in mgmt/newdata/cumin/python/cumin: grid and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-16 15:06:24 -0400 (Wed, 16 Jun 2010)
New Revision: 4030
Modified:
mgmt/newdata/cumin/python/cumin/grid/scheduler.py
mgmt/newdata/cumin/python/cumin/parameters.py
Log:
Remove unused (sqlobject-based) paramaters; fix random scheduler selection in submission form
Modified: mgmt/newdata/cumin/python/cumin/grid/scheduler.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/scheduler.py 2010-06-16 18:27:31 UTC (rev 4029)
+++ mgmt/newdata/cumin/python/cumin/grid/scheduler.py 2010-06-16 19:06:24 UTC (rev 4030)
@@ -73,33 +73,45 @@
def __init__(self, app, name):
super(SchedulerSelectField, self).__init__(app, name, None)
- self.param = SchedulerParameter(app, "param")
+ self.param = IntegerParameter(app, "param")
self.add_parameter(self.param)
+ cls = self.app.model.com_redhat_grid.Scheduler
+
+ self.object = ObjectAttribute(self, "object", cls, self.param)
+
self.input = self.SchedulerOptions(app, "input", self.param)
self.add_child(self.input)
def get(self, session):
- scheduler = self.param.get(session)
+ return self.object.get(session)
- if not scheduler:
- items = self.input.get_items(session)
+ def render_title(self, session):
+ return "Scheduler"
- if items:
- scheduler = choice(items)
+ class SchedulerOptions(OptionInputSet):
+ def do_process(self, session):
+ cls = self.app.model.com_redhat_grid.Scheduler
+ id = self.param.get(session)
- return scheduler
+ if id is None:
+ items = self.get_items(session)
- def render_title(self, session):
- return "Scheduler"
+ if items:
+ scheduler = choice(items)
+ self.param.set(session, scheduler._id)
- class SchedulerOptions(OptionInputSet):
+ super(SchedulerSelectField.SchedulerOptions, self).do_process \
+ (session)
+
def do_get_items(self, session):
collector = self.form.object.get(session)
cls = self.app.model.com_redhat_grid.Scheduler
+
if collector:
- schedulers = cls.get_selection(session.cursor, Pool=collector.Pool)
+ schedulers = cls.get_selection \
+ (session.cursor, Pool=collector.Pool)
else:
schedulers = cls.get_selection(session.cursor)
@@ -112,7 +124,7 @@
return item.Name
def render_item_selected_attr(self, session, item):
- if item is self.param.get(session):
+ if item._id == self.param.get(session):
return "selected=\"selected\""
class SchedulerGeneralStatSet(StatSet):
Modified: mgmt/newdata/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/parameters.py 2010-06-16 18:27:31 UTC (rev 4029)
+++ mgmt/newdata/cumin/python/cumin/parameters.py 2010-06-16 19:06:24 UTC (rev 4030)
@@ -63,19 +63,6 @@
finally:
cursor.close()
-class VhostParameter(RosemaryObjectParameter):
- def __init__(self, app, name):
- cls = app.model.org_apache_qpid_broker.Vhost
-
- super(VhostParameter, self).__init__(app, name, cls)
-
-class BindingParameter(Parameter):
- def do_unmarshal(self, string):
- return Binding.get(int(string))
-
- def do_marshal(self, binding):
- return str(binding.id)
-
class BrokerGroupParameter(Parameter):
def do_unmarshal(self, string):
if string == "__none__":
@@ -111,20 +98,6 @@
def do_marshal(self, broker):
return str(broker.id)
-class ConnectionParameter(Parameter):
- def do_unmarshal(self, string):
- return ClientConnection.get(int(string))
-
- def do_marshal(self, conn):
- return str(conn.id)
-
-class ConfigPropertyParameter(Parameter):
- def do_unmarshal(self, string):
- return ConfigProperty.get(int(string))
-
- def do_marshal(self, prop):
- return str(prop.id)
-
class ExchangeParameter(Parameter):
def do_unmarshal(self, string):
return Exchange.get(int(string))
@@ -160,25 +133,12 @@
class PeerParameter(LinkParameter):
pass
-class PoolParameter(Parameter):
- def do_unmarshal(self, string):
- return Pool(string)
-
- def do_marshal(self, pool):
- return str(pool._id)
-
class CollectorGridAttribute(ObjectAssociateAttribute):
def get_associate(self, session, collector):
cls = self.app.model.com_redhat_grid.Grid
grid = cls.get_object(session.cursor, Pool=collector.Pool)
return grid
-class CollectorNegotiatorAttribute(ObjectAssociateAttribute):
- def get_associate(self, session, collector):
- cls = self.app.model.com_redhat_grid.Negotiator
- negotiator = cls.get_object(session.cursor, Pool=collector.Pool)
- return negotiator
-
class QueueParameter(Parameter):
def do_unmarshal(self, string):
return Queue.get(int(string))
@@ -193,58 +153,9 @@
def do_marshal(self, route):
return str(route.id)
-class SchedulerParameter(Parameter):
- def do_unmarshal(self, string):
- return Scheduler.get(int(string))
-
- def do_marshal(self, sched):
- return str(sched.id)
-
-class SessionParameter(Parameter):
- def do_unmarshal(self, string):
- return Session.get(int(string))
-
- def do_marshal(self, session):
- return str(session.id)
-
-class SlotParameter(Parameter):
- def do_unmarshal(self, string):
- return Slot.get(int(string))
-
- def do_marshal(self, slot):
- return str(slot.id)
-
-class SubmissionParameter(Parameter):
- def do_unmarshal(self, string):
- return Submission.get(int(string))
-
- def do_marshal(self, sub):
- return str(sub.id)
-
-class SubmitterParameter(Parameter):
- def do_unmarshal(self, string):
- return Submitter.get(int(string))
-
- def do_marshal(self, sub):
- return str(sub.id)
-
-class CollectorParameter(Parameter):
- def do_unmarshal(self, string):
- return Collector.get(int(string))
-
- def do_marshal(self, coll):
- return str(coll.id)
-
class NegotiatorParameter(Parameter):
def do_unmarshal(self, string):
return Negotiator.get(int(string))
def do_marshal(self, neg):
return str(neg.id)
-
-class SystemParameter(Parameter):
- def do_unmarshal(self, string):
- return Sysimage.get(int(string))
-
- def do_marshal(self, session):
- return str(session.id)
14 years, 6 months
rhmessaging commits: r4029 - mgmt/newdata/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-16 14:27:31 -0400 (Wed, 16 Jun 2010)
New Revision: 4029
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
Log:
Properly bracket section between cumin start and stop
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-16 18:20:33 UTC (rev 4028)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-16 18:27:31 UTC (rev 4029)
@@ -38,24 +38,24 @@
cumin.start()
- conn = cumin.database.get_connection()
- cursor = conn.cursor()
+ try:
+ conn = cumin.database.get_connection()
+ cursor = conn.cursor()
- # cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
- cls = cumin.model.com_redhat_grid.Scheduler
- obj = cls.get_object(cursor)
+ # cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
+ cls = cumin.model.com_redhat_grid.Scheduler
+ obj = cls.get_object(cursor)
- print "Calling echo on", obj
+ print "Calling echo on", obj
- completed = Event()
-
- def completion(x, y):
- print x, y
- completed.set()
+ completed = Event()
+
+ def completion(x, y):
+ print x, y
+ completed.set()
- cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
+ cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
- try:
completed.wait()
finally:
cumin.stop()
14 years, 6 months
rhmessaging commits: r4028 - in mgmt/newdata/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-16 14:20:33 -0400 (Wed, 16 Jun 2010)
New Revision: 4028
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
mgmt/newdata/cumin/python/cumin/session.py
Log:
A safer means of calling a qmf method; improve cumin-smoke-test
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-15 18:47:09 UTC (rev 4027)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-16 18:20:33 UTC (rev 4028)
@@ -3,6 +3,8 @@
import os
import sys
+from threading import Event
+
home = os.environ.get("CUMIN_HOME", os.path.normpath("/usr/share/cumin"))
sys.path.append(os.path.join(home, "python"))
@@ -36,24 +38,25 @@
cumin.start()
- sleep(5)
-
conn = cumin.database.get_connection()
cursor = conn.cursor()
- cls = cumin.model.org_apache_qpid_broker.Broker
- broker = cls.get_object(cursor)
+ # cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
+ cls = cumin.model.com_redhat_grid.Scheduler
+ obj = cls.get_object(cursor)
+ print "Calling echo on", obj
+
+ completed = Event()
+
def completion(x, y):
- print "XXX", x, y
+ print x, y
+ completed.set()
- cumin.session.call_method(completion, broker, "echo", (1, "yeah"))
+ cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
try:
- while True:
- # print_threads()
-
- sleep(5)
+ completed.wait()
finally:
cumin.stop()
Modified: mgmt/newdata/cumin/python/cumin/session.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/session.py 2010-06-15 18:47:09 UTC (rev 4027)
+++ mgmt/newdata/cumin/python/cumin/session.py 2010-06-16 18:20:33 UTC (rev 4028)
@@ -12,6 +12,7 @@
self.qmf_session = None
self.qmf_brokers = list()
+ self.qmf_agents = dict()
# int seq => callable
self.outstanding_method_calls = dict()
@@ -25,9 +26,6 @@
qmf_broker = self.qmf_session.addBroker(uri)
- name = qmf_broker.thread.__class__.__name__
- qmf_broker.thread.name = "%s(%s)" % (name, uri)
-
self.qmf_brokers.append(qmf_broker)
def check(self):
@@ -55,11 +53,24 @@
for qmf_broker in self.qmf_brokers:
self.qmf_session.delBroker(qmf_broker)
+ def get_agent(self, agent_id):
+ self.lock.acquire()
+ try:
+ return self.qmf_agents.get(agent_id)
+ finally:
+ self.lock.release()
+
def call_method(self, callback, obj, name, args):
assert isinstance(obj, RosemaryObject)
- agent = self.qmf_session._getAgentForAgentAddr(obj._qmf_agent_id)
+ for i in range(10):
+ agent = self.get_agent(obj._qmf_agent_id)
+ if agent:
+ break
+
+ sleep(1)
+
if not agent:
raise Exception("Agent '%s' is unknown" % obj._qmf_agent_id)
@@ -92,18 +103,30 @@
def __init__(self, session):
self.session = session
+ def newPackage(self, name):
+ log.info("New package %s", name)
+
+ def newClass(self, kind, classKey):
+ log.info("New class %s", classKey)
+
def newAgent(self, qmf_agent):
log.info("New agent %s", qmf_agent)
+ self.session.lock.acquire()
+ try:
+ self.session.qmf_agents[qmf_agent.getAgentBank()] = qmf_agent
+ finally:
+ self.session.lock.release()
+
def delAgent(self, qmf_agent):
log.info("Deleting agent %s", qmf_agent)
- def newPackage(self, name):
- log.info("New package %s", name)
+ self.session.lock.acquire()
+ try:
+ del self.session.qmf_agents[qmf_agent.getAgentBank()]
+ finally:
+ self.session.lock.release()
- def newClass(self, kind, classKey):
- log.info("New class %s", classKey)
-
def methodResponse(self, broker, seq, response):
log.info("Method response for request %i received from %s",
seq, broker)
14 years, 6 months
rhmessaging commits: r4027 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-06-15 14:47:09 -0400 (Tue, 15 Jun 2010)
New Revision: 4027
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Refactor to remove exceptions from tmap execution path
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
This file is part of the Qpid async store library msgstore.so.
@@ -216,21 +216,14 @@
if (prep_tx_list_ptr)
{
for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- try {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
- }
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
}
}
- catch (const jexception& e) {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
}
}
std::ostringstream oss2;
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
This file is part of the Qpid async store library msgstore.so.
@@ -971,7 +971,7 @@
} else {
// Enqueue and/or dequeue tx
journal::txn_map& tmap = jc->get_txn_map();
- journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
@@ -1081,13 +1081,8 @@
bool is2PC = *(static_cast<char*>(dbuff)) != 0;
// Check transaction details; add to recover map
- // NOTE: There is a small but finite probability that the xid read above may have been removed by
- // another thread on one of the active queues by the time the get_tdata_list() call below is made.
- // Since reading the TPL is not considered a high-speed operation and is used for recovery and other
- // infrequent uses, the following try-catch will work as well as attempting to lock down the
- // entire transaction map for this operation - but with less complexity.
- try {
- journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
u_int64_t rid = 0;
@@ -1109,12 +1104,6 @@
assert(deqCnt <= 1);
tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
}
- catch (const journal::jexception& e) {
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- if (e.err_code() == journal::jerrno::JERR_MAP_NOTFOUND) break; // ignore this xid; move on
- throw;
- }
::free(xidbuff);
aio_sleep_cnt = 0;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -623,7 +623,7 @@
std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
// Unlock any affected enqueues in emap
for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
{
@@ -691,7 +691,13 @@
assert(xidp != 0);
std::string xid((char*)xidp, er.xid_size());
_tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
- _tmap.set_aio_compl(xid, h._rid);
+ if (_tmap.set_aio_compl(xid, h._rid)) // xid or rid not found
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid;
+ oss << "\" rid=0x" << h._rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+ }
std::free(xidp);
}
else
@@ -718,7 +724,13 @@
std::string xid((char*)xidp, dr.xid_size());
_tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
dr.is_txn_coml_commit()));
- _tmap.set_aio_compl(xid, dr.rid());
+ if (_tmap.set_aio_compl(xid, dr.rid())) // xid or rid not found
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid;
+ oss << "\" rid=0x" << dr.rid();
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+ }
std::free(xidp);
}
else
@@ -746,7 +758,7 @@
std::string xid((char*)xidp, ar.xid_size());
try
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag)
@@ -779,7 +791,7 @@
std::string xid((char*)xidp, cr.xid_size());
try
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -66,6 +66,12 @@
_pfid_txn_cnt.set_size(num_jfiles);
}
+u_int32_t
+txn_map::get_txn_pfid_cnt(const u_int16_t pfid) const
+{
+ return _pfid_txn_cnt.cnt(pfid);
+}
+
bool
txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
{
@@ -98,11 +104,7 @@
{
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_tdata_list_nolock");
- }
+ return _empty_data_list;
return itr->second;
}
@@ -112,11 +114,7 @@
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_remove_tdata_list");
- }
+ return _empty_data_list;
txn_data_list list = itr->second;
_map.erase(itr);
for (tdl_itr i=list.begin(); i!=list.end(); i++)
@@ -129,26 +127,22 @@
{
slock s(_mutex);
xmap_itr itr= _map.find(xid);
- if (itr == _map.end()) // not found in map
- return false;
- return true;
+ return itr != _map.end();
}
u_int32_t
-txn_map::get_rid_count(const std::string& xid)
+txn_map::enq_cnt()
{
- slock s(_mutex);
- xmap_itr itr = _map.find(xid);
- if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_rid_count");
- }
- return itr->second.size();
+ return cnt(true);
}
u_int32_t
+txn_map::deq_cnt()
+{
+ return cnt(true);
+}
+
+u_int32_t
txn_map::cnt(const bool enq_flag)
{
slock s(_mutex);
@@ -164,33 +158,13 @@
return c;
}
-u_int32_t
-txn_map::cnt(const std::string& xid, const bool enq_flag)
-{
- slock s(_mutex);
- u_int32_t c = 0;
- xmap_itr i = _map.find(xid);
- if (i == _map.end()) // not found in map
- return 0;
- for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
- {
- if (j->_enq_flag == enq_flag)
- c++;
- }
- return c;
-}
-
-bool
+int8_t
txn_map::is_txn_synced(const std::string& xid)
{
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
- }
+ return -1;
bool is_synced = true;
for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
@@ -200,43 +174,30 @@
break;
}
}
- return is_synced;
+ return is_synced ? 1 : 0;
}
-bool
+int8_t
txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
{
- bool ok = true;
- bool found = false;
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // xid not found in map
+ return -1;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
- slock s(_mutex);
- xmap_itr itr = _map.find(xid);
- if (itr == _map.end()) // not found in map
- ok = false;
- else
+ if (litr->_rid == rid)
{
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
- {
- if (litr->_rid == rid)
- {
- found = true;
- litr->_aio_compl = true;
- break;
- }
- }
+ litr->_aio_compl = true;
+ return 0; // rid found
}
}
- if (ok && !found)
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "set_aio_compl");
- }
- return ok;
+ // xid present, but rid not found
+ return -2;
}
-const txn_data&
-txn_map::get_data(const std::string& xid, const u_int64_t rid)
+bool
+txn_map::data_exists(const std::string& xid, const u_int64_t rid)
{
bool found = false;
{
@@ -248,14 +209,8 @@
found = itr->_rid == rid;
itr++;
}
- if (!found)
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_data");
- }
- return *itr;
}
+ return found;
}
bool
@@ -290,17 +245,5 @@
}
}
-// static fn
-std::string
-txn_map::xid_format(const std::string& xid)
-{
- if (xid.size() < 100)
- return xid;
- std::ostringstream oss;
- oss << "\"" << xid.substr(0, 20) << " ... " << xid.substr(xid.size() - 20, 20);
- oss << "\" [size: " << xid.size() << "]";
- return oss.str();
-}
-
} // namespace journal
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -119,37 +119,30 @@
xmap _map;
smutex _mutex;
arr_cnt _pfid_txn_cnt;
+ const txn_data_list _empty_data_list;
public:
txn_map();
virtual ~txn_map();
void set_num_jfiles(const u_int16_t num_jfiles);
- inline u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const
- { return _pfid_txn_cnt.cnt(pfid); };
-
+ u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const;
bool insert_txn_data(const std::string& xid, const txn_data& td);
const txn_data_list get_tdata_list(const std::string& xid);
const txn_data_list get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
- u_int32_t get_rid_count(const std::string& xid);
- inline u_int32_t enq_cnt() { return cnt(true); }
- inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
- inline u_int32_t deq_cnt() { return cnt(true); }
- inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
- bool is_txn_synced(const std::string& xid);
- bool set_aio_compl(const std::string& xid, const u_int64_t rid);
- const txn_data& get_data(const std::string& xid, const u_int64_t rid);
+ u_int32_t enq_cnt();
+ u_int32_t deq_cnt();
+ int8_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not synced; 1=synced
+ int8_t set_aio_compl(const std::string& xid, const u_int64_t rid); // -2=rid not found; -1=xid not found; 0=done
+ bool data_exists(const std::string& xid, const u_int64_t rid);
bool is_enq(const u_int64_t rid);
inline void clear() { _map.clear(); }
inline bool empty() const { return _map.empty(); }
- inline u_int32_t size() const { return u_int32_t(_map.size()); }
+ inline size_t size() const { return _map.size(); }
void xid_list(std::vector<std::string>& xv);
private:
u_int32_t cnt(const bool enq_flag);
- u_int32_t cnt(const std::string& xid, const bool enq_flag);
- static std::string xid_format(const std::string& xid);
-
const txn_data_list get_tdata_list_nolock(const std::string& xid);
};
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -372,7 +372,7 @@
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
try
@@ -469,7 +469,7 @@
// Delete this txn from tmap, process records into emap
std::string xid((char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
@@ -684,6 +684,8 @@
tot_data_toks++;
dtokp->set_wstate(data_tok::ENQ);
if (dtokp->has_xid())
+ // Ignoring return value here. A non-zero return can signify that the transaction
+ // has committed or aborted, and which was completed prior to the aio returning.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
@@ -691,6 +693,7 @@
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
if (dtokp->has_xid())
+ // Ignoring return value - see note above.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::ABORT_SUBM:
@@ -772,14 +775,7 @@
bool
wmgr::is_txn_synced(const std::string& xid)
{
- bool is_synced = true;
- // Check for outstanding enqueues/dequeues
- try { is_synced = _tmap.is_txn_synced(xid); }
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (!is_synced)
+ if (_tmap.is_txn_synced(xid) == 0) // not synced
return false;
// Check for outstanding commit/aborts
std::set<std::string>::iterator it = _txn_pending_set.find(xid);
@@ -898,16 +894,7 @@
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw;
if (xid.size())
- try
- {
- _tmap.get_data(xid, drid); // not in emap, try tmap
- found = true;
- }
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
+ found = _tmap.data_exists(xid, drid);
}
if (!found)
{
14 years, 6 months
rhmessaging commits: r4026 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-14 17:51:43 -0400 (Mon, 14 Jun 2010)
New Revision: 4026
Modified:
mgmt/newdata/mint/python/mint/update.py
Log:
Combine multiple sql operations into one cursor execute call
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-14 19:58:24 UTC (rev 4025)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-14 21:51:43 UTC (rev 4026)
@@ -207,44 +207,66 @@
self.process_properties(obj, object_columns)
self.process_statistics(obj, object_columns, sample_columns)
+ statements = list()
+
if object_columns:
object_columns.append(cls.sql_table._qmf_update_time)
- new = obj._sync_time is None
+ if obj._sync_time:
+ sql = cls.sql_update.emit(object_columns)
+ stats.updated += 1
+ else:
+ sql = cls.sql_insert.emit(object_columns)
+ stats.created += 1
- obj.save(cursor, object_columns)
+ statements.append(sql)
- if new:
- stats.created += 1
- else:
- stats.updated += 1
-
if sample_columns:
- drop = False
+ keep = True
if stats.enqueued - stats.dequeued > 100:
# There's some pressure, so consider dropping samples
now = datetime.now()
-
+
if update_time < now - minutes_ago:
# The sample is too old
- drop = True
+ keep = False
if last_update_time and last_update_time > now - seconds_ago:
# The samples are too fidelitous
- drop = True
+ keep = False
- if drop:
+ if keep:
+ sample_columns.append(cls.sql_samples_table._qmf_update_time)
+
+ sql = cls.sql_samples_insert.emit(sample_columns)
+ stats.sampled += 1
+
+ statements.append(sql)
+ else:
stats.dropped += 1
- else:
- col = cls.sql_samples_table._qmf_update_time
- sample_columns.append(col)
- obj.add_sample(cursor, sample_columns)
+ if statements:
+ text = "; ".join(statements)
- stats.sampled += 1
+ try:
+ cursor.execute(text, obj.__dict__)
+ except:
+ log.exception("%s failed", self)
+ log.info("Sql text: %s", text)
+ log.info("Sql values:")
+
+ for item in sorted(obj.__dict__.items()):
+ log.info(" %-34s %r", *item)
+
+ log.info("Sql row count: %i", cursor.rowcount)
+
+ raise
+
+ obj._sync_time = datetime.now()
+
def get_class(self):
class_key = self.object.getClassKey()
14 years, 6 months
rhmessaging commits: r4025 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-14 15:58:24 -0400 (Mon, 14 Jun 2010)
New Revision: 4025
Modified:
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/vacuum.py
Log:
* Reuse a single dedicated write cursor
* Use RosemaryObject.delete
* Keep track of creations versus updates
* Track sql operation counts
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -33,7 +33,7 @@
sleep(frequency)
class ExpireUpdate(Update):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
seconds = self.model.app.expire_threshold
log.info("Expiring samples older than %i seconds", seconds)
@@ -42,24 +42,17 @@
for pkg in self.model._packages:
for cls in pkg._classes:
- count += self.delete_samples(conn, cls, seconds)
+ count += self.delete_samples(cursor, cls, seconds)
- conn.commit()
-
log.info("Expired %i samples", count)
- def delete_samples(self, conn, cls, seconds):
- cursor = conn.cursor()
+ def delete_samples(self, cursor, cls, seconds):
+ cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
- try:
- cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
+ log.debug("Deleted %i %s", cursor.rowcount, cls)
- log.debug("Deleted %i %s", cursor.rowcount, cls)
+ return cursor.rowcount
- return cursor.rowcount
- finally:
- cursor.close()
-
def convert_time_units(t):
if t / (24 * 3600) >= 1:
t_out = t / (24 * 3600)
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -99,9 +99,6 @@
def newClass(self, kind, classKey):
log.info("New class %s", classKey)
- # XXX I want to store class keys using this, but I can't,
- # because I don't get any agent info; instead
-
def objectProps(self, broker, obj):
agent = self.model.get_agent(obj.getAgent())
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -3,6 +3,7 @@
import pickle
from psycopg2 import IntegrityError, TimestampFromTicks
+from psycopg2.extensions import cursor as Cursor
from rosemary.model import *
from util import *
@@ -18,16 +19,21 @@
self.updates = ConcurrentQueue()
self.stats = UpdateStats(self.app)
- self.thread = None
self.conn = None
self.read_cursor = None
+ self.write_cursor = None
self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
- self.read_cursor = self.conn.cursor()
+ self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+
+ self.read_cursor.stats = self.stats
+ self.write_cursor.stats = self.stats
+
def enqueue(self, update):
self.updates.put(update)
@@ -51,17 +57,15 @@
self.stats.dequeued += 1
- update.thread = self
+ update.process(self)
- update.process(self.conn, self.stats)
-
class UpdateStats(object):
names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
- "*Sampled", "*Deleted", "*Dropped",
- "Errors", "CPU (%)", "Mem (M)")
- headings = ("%10s " * 11) % names
+ "*Sampled", "*Deleted", "*Dropped", "*Sql Ops",
+ "Errors", "Cpu (%)", "Mem (M)")
+ headings = ("%10s " * 12) % names
values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f %10.1f " + \
- "%10.1f %10i %10i %10.1f"
+ "%10.1f %10.1f %10i %10i %10.1f"
then = None
now = None
@@ -76,6 +80,8 @@
self.deleted = 0
self.dropped = 0
+ self.sql_ops = 0
+
self.errors = 0
self.time = None
@@ -96,7 +102,10 @@
UpdateStats.now = now
def get_resident_pages(self):
- line = open("/proc/%i/statm" % os.getpid()).read()
+ try:
+ line = open("/proc/%i/statm" % os.getpid()).read()
+ except:
+ return 0
return int(line.split()[1])
@@ -115,7 +124,8 @@
self.now.updated - self.then.updated,
self.now.sampled - self.then.sampled,
self.now.deleted - self.then.deleted,
- self.now.dropped - self.then.dropped]
+ self.now.dropped - self.then.dropped,
+ self.now.sql_ops - self.then.sql_ops]
secs = self.now.time - self.then.time
values = map(lambda x: x / secs, values)
@@ -128,32 +138,41 @@
print self.values_fmt % tuple(values)
+class UpdateCursor(Cursor):
+ def execute(self, sql, args=None):
+ super(UpdateCursor, self).execute(sql, args)
+
+ self.stats.sql_ops += 1
+
class Update(object):
def __init__(self, model):
self.model = model
+ self.thread = None
- def process(self, conn, stats):
+ def process(self, thread):
log.debug("Processing %s", self)
try:
- self.do_process(conn, stats)
+ self.do_process(thread.write_cursor, thread.stats)
- conn.commit()
+ thread.conn.commit()
except UpdateException, e:
log.info("Update could not be completed; %s", e)
- conn.rollback()
+ thread.conn.rollback()
except:
log.exception("Update failed")
- conn.rollback()
+ thread.conn.rollback()
- stats.errors += 1
+ thread.stats.errors += 1
- if self.thread.halt_on_error:
+ if thread.halt_on_error:
raise
- def do_process(self, conn, stats):
+ #print_exc()
+
+ def do_process(self, cursor, stats):
raise Exception("Not implemented")
def __repr__(self):
@@ -166,7 +185,7 @@
self.agent = agent
self.object = obj
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
@@ -188,49 +207,44 @@
self.process_properties(obj, object_columns)
self.process_statistics(obj, object_columns, sample_columns)
- cursor = conn.cursor()
+ if object_columns:
+ object_columns.append(cls.sql_table._qmf_update_time)
- try:
- if object_columns:
- object_columns.append(cls.sql_table._qmf_update_time)
+ new = obj._sync_time is None
- new = obj._sync_time is None
+ obj.save(cursor, object_columns)
- obj.save(cursor, object_columns)
+ if new:
+ stats.created += 1
+ else:
+ stats.updated += 1
- if new:
- stats.created += 1
- else:
- stats.updated += 1
+ if sample_columns:
+ drop = False
- if sample_columns:
- drop = False
+ if stats.enqueued - stats.dequeued > 100:
+ # There's some pressure, so consider dropping samples
- if stats.enqueued - stats.dequeued > 100:
- # There's some pressure, so consider dropping samples
+ now = datetime.now()
- now = datetime.now()
+ if update_time < now - minutes_ago:
+ # The sample is too old
+ drop = True
- if update_time < now - minutes_ago:
- # The sample is too old
- drop = True
+ if last_update_time and last_update_time > now - seconds_ago:
+ # The samples are too fidelitous
+ drop = True
- if last_update_time and last_update_time > now - seconds_ago:
- # The samples are too fidelitous
- drop = True
+ if drop:
+ stats.dropped += 1
+ else:
+ col = cls.sql_samples_table._qmf_update_time
+ sample_columns.append(col)
- if drop:
- stats.dropped += 1
- else:
- col = cls.sql_samples_table._qmf_update_time
- sample_columns.append(col)
+ obj.add_sample(cursor, sample_columns)
- obj.add_sample(cursor, sample_columns)
+ stats.sampled += 1
- stats.sampled += 1
- finally:
- cursor.close()
-
def get_class(self):
class_key = self.object.getClassKey()
@@ -255,7 +269,7 @@
try:
return self.agent.objects_by_id[object_id]
except KeyError:
- cursor = self.thread.read_cursor
+ cursor = self.model.app.update_thread.read_cursor
obj = RosemaryObject(cls, None)
obj._qmf_agent_id = self.agent.id
@@ -413,18 +427,13 @@
return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
class ObjectDelete(ObjectUpdate):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
- cursor = conn.cursor()
+ obj.delete(cursor)
try:
- cls.sql_delete.execute(cursor, (), obj.__dict__)
- finally:
- cursor.close()
-
- try:
del self.agent.objects_by_id[self.object.getObjectId().objectName]
except KeyError:
pass
@@ -437,18 +446,13 @@
self.agent = agent
- def do_process(self, conn, stats):
- cursor = conn.cursor()
-
+ def do_process(self, cursor, stats):
id = self.agent.id
- try:
- for pkg in self.model._packages:
- for cls in pkg._classes:
- for obj in cls.get_selection(cursor, _qmf_agent_id=id):
- obj.delete(cursor)
- finally:
- cursor.close()
+ for pkg in self.model._packages:
+ for cls in pkg._classes:
+ for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ obj.delete(cursor)
class UpdateException(Exception):
def __init__(self, name):
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -15,26 +15,24 @@
sleep(60 * 60 * 10)
class VacuumUpdate(Update):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
log.info("Vacumming tables")
+ conn = self.model.app.update_thread.conn
+
level = conn.isolation_level
conn.set_isolation_level(0)
for pkg in self.model._packages:
for cls in pkg._classes:
- self.vacuum(conn, cls)
+ self.vacuum(cursor, cls)
conn.set_isolation_level(level)
log.info("Vacuumed tables")
- def vacuum(self, conn, cls):
- cursor = conn.cursor()
+ def vacuum(self, cursor, cls):
sql = "vacuum verbose %s"
- try:
- cursor.execute(sql % cls.sql_table.identifier)
- cursor.execute(sql % cls.sql_samples_table.identifier)
- finally:
- cursor.close()
+ cursor.execute(sql % cls.sql_table.identifier)
+ cursor.execute(sql % cls.sql_samples_table.identifier)
14 years, 6 months
rhmessaging commits: r4024 - in mgmt/newdata: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-06-14 11:55:33 -0400 (Mon, 14 Jun 2010)
New Revision: 4024
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/util.py
Log:
* Use a single update class to handle prop and stat updates
* Improve reporting from cumin-data --print-stats
* Reuse a read cursor for get_object, a notable scalability gain
* Put in a workaround for some unexpectedly null data
* Quiet the debug logging a little
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-06-14 15:55:33 UTC (rev 4024)
@@ -38,7 +38,7 @@
count = 0
if opts.print_stats:
- print "[Reported values are the number of events per second]"
+ print "[Starred columns are the number of events per second]"
while True:
if count % 24 == 0:
@@ -46,7 +46,7 @@
count += 1
- stats.print_rates()
+ stats.print_values()
sleep(5)
else:
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -108,12 +108,6 @@
if not self.model.app.update_thread.isAlive():
return
- # XXX objectProps is getting called even if no properties are
- # set
-
- if not obj.getProperties():
- return
-
if obj.getTimestamps()[2]:
up = ObjectDelete(self.model, agent, obj)
else:
@@ -127,7 +121,7 @@
if not self.model.app.update_thread.isAlive():
return
- up = ObjectAddSample(self.model, agent, obj)
+ up = ObjectUpdate(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -1,4 +1,5 @@
import copy
+import resource
import pickle
from psycopg2 import IntegrityError, TimestampFromTicks
@@ -7,33 +8,36 @@
log = logging.getLogger("mint.update")
+minutes_ago = timedelta(minutes=5)
+seconds_ago = timedelta(seconds=60)
+
class UpdateThread(MintDaemonThread):
def __init__(self, app):
super(UpdateThread, self).__init__(app)
self.updates = ConcurrentQueue()
+ self.stats = UpdateStats(self.app)
- self.stats = UpdateStats(self.app)
+ self.thread = None
self.conn = None
+ self.read_cursor = None
- self.halt_on_error = True
+ self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
+ self.read_cursor = self.conn.cursor()
def enqueue(self, update):
- update.thread = self
-
self.updates.put(update)
- if self.stats:
- self.stats.enqueued += 1
+ self.stats.enqueued += 1
# This is an attempt to yield from the enqueueing thread (this
# method's caller) to the update thread
if self.updates.qsize() > 1000:
- sleep(0.1)
+ sleep(0)
def run(self):
while True:
@@ -45,15 +49,19 @@
except Empty:
continue
- if self.stats:
- self.stats.dequeued += 1
+ self.stats.dequeued += 1
+ update.thread = self
+
update.process(self.conn, self.stats)
class UpdateStats(object):
- names = ("Enqueued", "Dequeued", "Updated", "Deleted", "Dropped")
- headings = ("%8s " * 5) % names
- rates_fmt = ("%8.1f " * 5)
+ names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
+ "*Sampled", "*Deleted", "*Dropped",
+ "Errors", "CPU (%)", "Mem (M)")
+ headings = ("%10s " * 11) % names
+ values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f %10.1f " + \
+ "%10.1f %10i %10i %10.1f"
then = None
now = None
@@ -62,27 +70,40 @@
self.enqueued = 0
self.dequeued = 0
+ self.created = 0
self.updated = 0
+ self.sampled = 0
self.deleted = 0
self.dropped = 0
- self.samples_updated = 0
- self.samples_expired = 0
- self.samples_dropped = 0
+ self.errors = 0
self.time = None
+ self.cpu = 0
+ self.memory = 0
def capture(self):
now = copy.copy(self)
+
now.time = time.time()
+ rusage = resource.getrusage(resource.RUSAGE_SELF)
+
+ now.cpu = rusage[0] + rusage[1]
+ now.memory = self.get_resident_pages() * resource.getpagesize()
+
UpdateStats.then = UpdateStats.now
UpdateStats.now = now
+ def get_resident_pages(self):
+ line = open("/proc/%i/statm" % os.getpid()).read()
+
+ return int(line.split()[1])
+
def print_headings(self):
print self.headings
- def print_rates(self):
+ def print_values(self):
self.capture()
if not self.then:
@@ -90,18 +111,22 @@
values = [self.now.enqueued - self.then.enqueued,
self.now.dequeued - self.then.dequeued,
+ self.now.created - self.then.created,
self.now.updated - self.then.updated,
+ self.now.sampled - self.then.sampled,
self.now.deleted - self.then.deleted,
self.now.dropped - self.then.dropped]
- # XXX
- values[2] += self.now.samples_updated - self.then.samples_updated
- values[4] += self.now.samples_dropped - self.then.samples_dropped
-
secs = self.now.time - self.then.time
- rates = map(lambda x: x / secs, values)
+ values = map(lambda x: x / secs, values)
- print self.rates_fmt % tuple(rates)
+ values.insert(2, self.now.enqueued - self.now.dequeued)
+
+ values.append(self.errors)
+ values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
+ values.append(self.now.memory / 1000000.0)
+
+ print self.values_fmt % tuple(values)
class Update(object):
def __init__(self, model):
@@ -123,7 +148,9 @@
conn.rollback()
- if self.model.app.update_thread.halt_on_error:
+ stats.errors += 1
+
+ if self.thread.halt_on_error:
raise
def do_process(self, conn, stats):
@@ -143,20 +170,67 @@
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
- columns = list()
+ if not obj._sync_time and not self.object.getProperties():
+ # This is a sample for an object we don't have yet
+ stats.dropped += 1; return
- self.process_headers(obj, columns)
- self.process_properties(obj, columns)
+ update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+ last_update_time = obj._qmf_update_time
+
+ obj._qmf_update_time = update_time
+
+ object_columns = list()
+ sample_columns = list()
+
+ self.process_headers(obj, object_columns)
+ self.process_properties(obj, object_columns)
+ self.process_statistics(obj, object_columns, sample_columns)
+
cursor = conn.cursor()
try:
- obj.save(cursor, columns)
+ if object_columns:
+ object_columns.append(cls.sql_table._qmf_update_time)
+
+ new = obj._sync_time is None
+
+ obj.save(cursor, object_columns)
+
+ if new:
+ stats.created += 1
+ else:
+ stats.updated += 1
+
+ if sample_columns:
+ drop = False
+
+ if stats.enqueued - stats.dequeued > 100:
+ # There's some pressure, so consider dropping samples
+
+ now = datetime.now()
+
+ if update_time < now - minutes_ago:
+ # The sample is too old
+ drop = True
+
+ if last_update_time and last_update_time > now - seconds_ago:
+ # The samples are too fidelitous
+ drop = True
+
+ if drop:
+ stats.dropped += 1
+ else:
+ col = cls.sql_samples_table._qmf_update_time
+ sample_columns.append(col)
+
+ obj.add_sample(cursor, sample_columns)
+
+ stats.sampled += 1
finally:
cursor.close()
- stats.updated += 1
-
def get_class(self):
class_key = self.object.getClassKey()
@@ -181,20 +255,19 @@
try:
return self.agent.objects_by_id[object_id]
except KeyError:
- conn = self.model.app.database.get_connection()
- cursor = conn.cursor()
+ cursor = self.thread.read_cursor
obj = RosemaryObject(cls, None)
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = object_id
+ #try:
try:
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
- finally:
- cursor.close()
+ cls.load_object_by_qmf_id(cursor, obj)
+ except RosemaryNotFound:
+ obj._id = cls.get_new_id(cursor)
+ #finally:
+ # cursor.close()
self.agent.objects_by_id[object_id] = obj
@@ -204,26 +277,21 @@
table = obj._class.sql_table
update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
create_time = datetime.fromtimestamp(create_time / 1000000000)
if delete_time:
delete_time = datetime.fromtimestamp(delete_time / 1000000000)
- if obj._sync_time:
- # This object is already in the database
+ obj._qmf_delete_time = delete_time
+ columns.append(table._qmf_delete_time)
- obj._qmf_update_time = update_time
- columns.append(table._qmf_update_time)
+ if not obj._sync_time:
+ # The object hasn't been written to the database yet
- # XXX session_id may have changed too?
- else:
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = self.object.getObjectId().objectName
obj._qmf_session_id = str(self.object.getObjectId().getSequence())
obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_update_time = update_time
obj._qmf_create_time = create_time
columns.append(table._id)
@@ -231,7 +299,6 @@
columns.append(table._qmf_object_id)
columns.append(table._qmf_session_id)
columns.append(table._qmf_class_key)
- columns.append(table._qmf_update_time)
columns.append(table._qmf_create_time)
def process_properties(self, obj, columns):
@@ -244,7 +311,7 @@
else:
col, nvalue = self.process_value(cls, prop, value)
except MappingException, e:
- log.debug(e)
+ #log.debug(e)
continue
# XXX This optimization will be obsolete when QMF does it
@@ -310,6 +377,34 @@
return value
+ def process_statistics(self, obj, update_columns, insert_columns):
+ for stat, value in self.object.getStatistics():
+ try:
+ col = obj._class._statistics_by_name[stat.name].sql_column
+ except KeyError:
+ log.debug("Statistic %s is unknown", stat)
+
+ continue
+
+ if value is not None:
+ value = self.transform_value(stat, value)
+
+ # XXX hack workaround
+ if col.name == "MonitorSelfTime":
+ value = datetime.now()
+
+ # Don't write unchanged values
+ #
+ # XXX This optimization will be obsolete when QMF does it
+ # instead
+
+ if value != getattr(obj, col.name):
+ update_columns.append(col)
+
+ insert_columns.append(col)
+
+ setattr(obj, col.name, value)
+
def __repr__(self):
name = self.__class__.__name__
cls = self.object.getClassKey().getClassName()
@@ -336,71 +431,6 @@
stats.deleted += 1
-class ObjectAddSample(ObjectUpdate):
- def do_process(self, conn, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
-
- if not cls._statistics:
- stats.samples_dropped += 1; return
-
- if not obj._sync_time:
- stats.samples_dropped += 1; return
-
- if stats.enqueued - stats.dequeued > 100:
- if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
- stats.samples_dropped += 1; return
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
-
- update_columns = list()
- update_columns.append(cls.sql_table._qmf_update_time)
-
- insert_columns = list()
- insert_columns.append(cls.sql_samples_table._qmf_update_time)
-
- obj._qmf_update_time = update_time
-
- self.process_samples(obj, update_columns, insert_columns)
-
- cursor = conn.cursor()
-
- try:
- obj.save(cursor, update_columns)
-
- cls.sql_samples_insert.execute \
- (cursor, insert_columns, obj.__dict__)
- finally:
- cursor.close()
-
- stats.samples_updated += 1
-
- def process_samples(self, obj, update_columns, insert_columns):
- for stat, value in self.object.getStatistics():
- try:
- col = obj._class._statistics_by_name[stat.name].sql_column
- except KeyError:
- log.debug("Statistic %s is unknown", stat)
-
- continue
-
- if value is not None:
- value = self.transform_value(stat, value)
-
- # Don't write unchanged values
- #
- # XXX This optimization will be obsolete when QMF does it
- # instead
-
- if value != getattr(obj, col.name):
- update_columns.append(col)
-
- insert_columns.append(col)
-
- setattr(obj, col.name, value)
-
class AgentDelete(Update):
def __init__(self, model, agent):
super(AgentDelete, self).__init__(model)
@@ -408,8 +438,6 @@
self.agent = agent
def do_process(self, conn, stats):
- return # XXX don't delete until we stop getting unexpected deletes
-
cursor = conn.cursor()
id = self.agent.id
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/util.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -7,6 +7,7 @@
from Queue import Queue as ConcurrentQueue, Full, Empty
from crypt import crypt
from datetime import datetime, timedelta
+from pprint import pprint
from getpass import getpass
from qmf.console import ObjectId
from random import sample
14 years, 6 months
rhmessaging commits: r4023 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2010-06-14 08:54:06 -0400 (Mon, 14 Jun 2010)
New Revision: 4023
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Removed old @param statements
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-14 12:52:38 UTC (rev 4022)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-14 12:54:06 UTC (rev 4023)
@@ -235,7 +235,6 @@
* Called after instantiation in order to configure the message store.
*
* @param name The name of the virtual host using this store
- * @param vHostConfig The configuration for this virtualhost
* @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
@@ -1347,7 +1346,6 @@
* @param messageId The message to store the data for.
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
- * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
@@ -1427,7 +1425,6 @@
/**
* Retrieves message meta-data.
*
- * @param context The transactional context for the operation.
* @param messageId The message to get the meta-data for.
*
* @return The message meta data.
14 years, 6 months
rhmessaging commits: r4022 - in store/trunk/java/bdbstore/src: tools/java/org/apache/qpid/server/util and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2010-06-14 08:52:38 -0400 (Mon, 14 Jun 2010)
New Revision: 4022
Removed:
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
QPID-2652 : Logging Update
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-11 19:43:00 UTC (rev 4021)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-14 12:52:38 UTC (rev 4022)
@@ -179,7 +179,7 @@
LogSubject logSubject) throws Exception
{
_logSubject = logSubject;
- CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
if(_configured)
{
@@ -200,7 +200,7 @@
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName()));
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
@@ -213,7 +213,7 @@
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName()));
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
@@ -253,7 +253,7 @@
}
}
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
_version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
@@ -499,7 +499,7 @@
_state = State.CLOSED;
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED());
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
}
private void closeEnvironment() throws DatabaseException
@@ -519,7 +519,7 @@
{
stateTransition(State.CONFIGURED, State.RECOVERING);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START());
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
try
{
Deleted: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2010-06-11 19:43:00 UTC (rev 4021)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2010-06-14 12:52:38 UTC (rev 4022)
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.NullRootMessageLogger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-//import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.NoSuchElementException;
-
-public class NullApplicationRegistry extends ApplicationRegistry
-{
- public NullApplicationRegistry() throws ConfigurationException
- {
- super(new ServerConfiguration(new PropertiesConfiguration()));
- }
-
- public void initialise(int instanceID) throws Exception
- {
- _logger.info("Initialising NullApplicationRegistry");
-
- _rootMessageLogger = new NullRootMessageLogger();
-
- //We should use a Test Actor Here not the Broker Actor
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
-
- _configuration.setHousekeepingExpiredMessageCheckPeriod(200);
-
- Properties users = new Properties();
-
- users.put("guest", "guest");
-
- _databaseManager = new PropertiesPrincipalDatabaseManager("default", users);
-
- // FIXME _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY);
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
-
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry(this);
- PropertiesConfiguration vhostProps = new PropertiesConfiguration();
- VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- VirtualHost dummyHost = new VirtualHostImpl(hostConfig,null);
- _virtualHostRegistry.registerVirtualHost(dummyHost);
- _virtualHostRegistry.setDefaultVirtualHostName("test");
- // FIXME _pluginManager = new PluginManager("");
- _startup = new Exception("NAR");
-
- }
- private Exception _startup;
- public Collection<String> getVirtualHostNames()
- {
- String[] hosts = {"test"};
- return Arrays.asList(hosts);
- }
-
- @Override
- public void close()
- {
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
-
- try
- {
- super.close();
- }
- finally
- {
- try
- {
- CurrentActor.remove();
- }
- catch (NoSuchElementException npe)
- {
- _startup.printStackTrace();
- _startup.printStackTrace(System.err);
- }
-
- }
- }
-}
-
-
-
14 years, 6 months