rhmessaging commits: r1387 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-11-29 07:38:22 -0500 (Thu, 29 Nov 2007)
New Revision: 1387
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Slight improvement to the previous fix for deadlock avoidance: set cursor to message, then remove mapping, then if needed delete message from position of first cursor.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 12:38:22 UTC (rev 1387)
@@ -1130,22 +1130,27 @@
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
{
- //need to get a lock on the messageDb in case we want to delete
- //(to avoid deadlocks with enqueue that acquires locks in this
- //order)
- Dbt peek;
- peek.set_flags(DB_DBT_USERMEM);
- peek.set_ulen(0);
+ //First look up the message, this gets a lock on that table in
+ //case we need to delete it (avoiding deadlocks with enqueue where
+ //the locking order is messageDb then mappingDb)
+ Cursor msgCursor;
+ msgCursor.open(messageDb, txn);
+
try {
- int status = messageDb.get(txn, &messageId, &peek, DB_RMW);
- if (status != DB_BUFFER_SMALL) {
- THROW_STORE_EXCEPTION("Unexpected status code when peeking at message: " + string(DbEnv::strerror(status)));
+ Dbt peek;
+ peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ peek.set_ulen(0);
+ int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
+ if (status == DB_NOTFOUND ) {
+ THROW_STORE_EXCEPTION("Can't find record for message");
+ } else if (status != 0 && status != DB_BUFFER_SMALL) {
+ string e = "Dequeue failed (while seeking message) with unexpected status = ";
+ e += DbEnv::strerror(status);
+ THROW_STORE_EXCEPTION(e);
}
} catch (DbMemoryException& expected) {
- //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
}
-
Cursor cursor;
cursor.open(mappingDb, txn);
@@ -1158,7 +1163,12 @@
THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
}
- return deleteIfUnused(cursor, txn, messageId);
+ if (isUnused(cursor, messageId)) {
+ msgCursor->del(0);
+ return true;
+ } else {
+ return false;
+ }
}
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
@@ -1177,15 +1187,24 @@
bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
{
+ if (isUnused(cursor, messageId)) {
+ messageDb.del(txn, &messageId, 0);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool BdbMessageStore::isUnused(Cursor& cursor, Dbt& messageId)
+{
Dbt empty;
int status = cursor->get(&messageId, &empty, DB_SET);
if (status == DB_NOTFOUND) {
- messageDb.del(txn, &messageId, 0);
return true;
} else if (status == 0) {
return false;
} else {
- THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
+ THROW_STORE_EXCEPTION("Dequeue failed (in isUnused()) with status = " + status);
}
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-29 12:38:22 UTC (rev 1387)
@@ -110,6 +110,7 @@
const qpid::broker::PersistableQueue& queue);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
+ bool isUnused(Cursor& cursor, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
17 years
rhmessaging commits: r1386 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-11-29 05:19:28 -0500 (Thu, 29 Nov 2007)
New Revision: 1386
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Initial (temporary?) fix for deadlocking in bdbstore.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 09:57:11 UTC (rev 1385)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 10:19:28 UTC (rev 1386)
@@ -1130,6 +1130,22 @@
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
{
+ //need to get a lock on the messageDb in case we want to delete
+ //(to avoid deadlocks with enqueue that acquires locks in this
+ //order)
+ Dbt peek;
+ peek.set_flags(DB_DBT_USERMEM);
+ peek.set_ulen(0);
+ try {
+ int status = messageDb.get(txn, &messageId, &peek, DB_RMW);
+ if (status != DB_BUFFER_SMALL) {
+ THROW_STORE_EXCEPTION("Unexpected status code when peeking at message: " + string(DbEnv::strerror(status)));
+ }
+ } catch (DbMemoryException& expected) {
+ //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
+ }
+
+
Cursor cursor;
cursor.open(mappingDb, txn);
17 years
rhmessaging commits: r1385 - in store/branches/java/M2.1.1/java: bdbstorebackup and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: rupertlssmith
Date: 2007-11-29 04:57:11 -0500 (Thu, 29 Nov 2007)
New Revision: 1385
Modified:
store/branches/java/M2.1.1/java/bdbstore/pom.xml
store/branches/java/M2.1.1/java/bdbstorebackup/pom.xml
Log:
Updated poms for version M2.1.1
Modified: store/branches/java/M2.1.1/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/M2.1.1/java/bdbstore/pom.xml 2007-11-29 09:44:52 UTC (rev 1384)
+++ store/branches/java/M2.1.1/java/bdbstore/pom.xml 2007-11-29 09:57:11 UTC (rev 1385)
@@ -22,14 +22,14 @@
<groupId>org.etp.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
<packaging>jar</packaging>
- <version>1.0-incubating-M2-SNAPSHOT</version>
+ <version>1.0-incubating-M2.1.1-SNAPSHOT</version>
<name>Qpid BDB Store</name>
<url>http://cwiki.apache.org/confluence/display/qpid</url>
<parent>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
- <version>1.0-incubating-M2-SNAPSHOT</version>
+ <version>1.0-incubating-M2.1.1-SNAPSHOT</version>
</parent>
<!-- Local repository for the BerkelyDB-je so we don't have to use the installer script -->
Modified: store/branches/java/M2.1.1/java/bdbstorebackup/pom.xml
===================================================================
--- store/branches/java/M2.1.1/java/bdbstorebackup/pom.xml 2007-11-29 09:44:52 UTC (rev 1384)
+++ store/branches/java/M2.1.1/java/bdbstorebackup/pom.xml 2007-11-29 09:57:11 UTC (rev 1385)
@@ -22,14 +22,14 @@
<groupId>org.etp.qpid</groupId>
<artifactId>qpid-bdbstorebackup</artifactId>
<packaging>jar</packaging>
- <version>1.0-incubating-M2-SNAPSHOT</version>
+ <version>1.0-incubating-M2.1.1-SNAPSHOT</version>
<name>Qpid BDB Store Backup</name>
<url>http://cwiki.apache.org/confluence/display/qpid</url>
<parent>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
- <version>1.0-incubating-M2-SNAPSHOT</version>
+ <version>1.0-incubating-M2.1.1-SNAPSHOT</version>
</parent>
<!-- Local repository for the BerkelyDB-je so we don't have to use the installer script -->
17 years
rhmessaging commits: r1383 - store/branches/java.
by rhmessaging-commits@lists.jboss.org
Author: rupertlssmith
Date: 2007-11-29 04:38:43 -0500 (Thu, 29 Nov 2007)
New Revision: 1383
Added:
store/branches/java/M2.1.1/
Log:
Created new M2.1.1 branch to match M2.1.1 branch on Apache svn.
Copied: store/branches/java/M2.1.1 (from rev 1382, store/branches/java/M2.1)
17 years
rhmessaging commits: r1382 - store/trunk/cpp.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2007-11-28 17:57:17 -0500 (Wed, 28 Nov 2007)
New Revision: 1382
Added:
store/trunk/cpp/mrg.spec
store/trunk/cpp/rhg.spec
Log:
specfiles for top-level packages mrg and rhg
Added: store/trunk/cpp/mrg.spec
===================================================================
--- store/trunk/cpp/mrg.spec (rev 0)
+++ store/trunk/cpp/mrg.spec 2007-11-28 22:57:17 UTC (rev 1382)
@@ -0,0 +1,35 @@
+Summary: MRG - Messaging, Real-Time, and Grid components
+Name: mrg
+Version: 1.0
+Release: 1%{?dist}
+License: LGPL
+Group: System Environment/Libraries
+URL: http://redhat.com/mrg
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+
+Requires: rhm
+Requires: rhrt
+Requires: rhg
+
+%description
+This is the top-level package that includes all the MRG components: Messaging,
+Reat-Time, and Grid.
+
+%install
+rm -rf %{buildroot}
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%prep
+
+%build
+
+%files
+%defattr(-,root,root,-)
+
+
+%changelog
+* Wed Nov 28 2007 Nuno Santos <nsantos(a)redhat.com> - 1.0-1
+- Initial build.
+
Added: store/trunk/cpp/rhg.spec
===================================================================
--- store/trunk/cpp/rhg.spec (rev 0)
+++ store/trunk/cpp/rhg.spec 2007-11-28 22:57:17 UTC (rev 1382)
@@ -0,0 +1,32 @@
+Summary: MRG - Grid component
+Name: rhg
+Version: 1.0
+Release: 1%{?dist}
+License: LGPL
+Group: System Environment/Libraries
+URL: http://redhat.com/mrg
+BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+
+Requires: condor
+
+%description
+This is the top-level package that includes the MRG Grid component.
+
+%install
+rm -rf %{buildroot}
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+%prep
+
+%build
+
+%files
+%defattr(-,root,root,-)
+
+
+%changelog
+* Wed Nov 28 2007 Nuno Santos <nsantos(a)redhat.com> - 1.0-1
+- Initial build.
+
17 years
rhmessaging commits: r1381 - mgmt/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-28 12:08:54 -0500 (Wed, 28 Nov 2007)
New Revision: 1381
Modified:
mgmt/bin/quirk
Log:
Updates the example client with a simpler api concept.
Modified: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk 2007-11-27 23:52:36 UTC (rev 1380)
+++ mgmt/bin/quirk 2007-11-28 17:08:54 UTC (rev 1381)
@@ -5,76 +5,164 @@
from qpid.content import Content
class Exchange(object):
- def __init__(self, name):
+ def __init__(self, session, name):
+ self.session = session
self.name = name
- def declare(self, session):
- session.exchange_declare(exchange=self.name)
- session.message_flow("amq.direct", 0, 0xFFFFFFFF)
- session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
class Queue(object):
- def __init__(self, name):
+ def __init__(self, session, name):
+ self.session = session
self.name = name
- def declare(self, session):
- session.queue_declare(queue=self.name) #XXX blows up without queue=
+class Subscription(object):
+ def __init__(self, session, queue, name):
+ self.session = session
+ self.queue = queue
+ self.name = name
- def bind(self, session, exchange, binding_key=None):
- if binding_key is None:
- binding_key = self.name
+ # XXX what all does this do? it seems to declare things
- session.queue_bind(exchange=exchange.name, queue=self.name,
- routing_key=binding_key)
+ # XXX what is the destination arg for?
+ # XXX from reading the spec, "destination" seems less
+ # appropriate than "subscription name" (which is what the spec
+ # ch. 25 docs say it is)
+ session.csession.message_subscribe(queue="test", destination=self.name)
+
+ session.csession.message_flow(self.name, 0, 0xFFFFFFFF)
+ session.csession.message_flow(self.name, 1, 0xFFFFFFFF)
+
+ self.client_queue = session.client.queue(self.name)
+
+ def get(self):
+ m = Message()
+ m.content = self.client_queue.get(timeout=10).content
+ return m
+
class Message(object):
def __init__(self, body=""):
self.content = Content(body)
self.content["content_type"] = "text/plain"
- def set_content_type(self, type):
- self.content["content_type"] = type
-
def set_routing_key(self, key):
- self.content["routing_key"] = key # XXX why here and not an arg to message_transfer?
+ self.content["routing_key"] = key
- def send(self, session, exchange):
- session.message_transfer(destination=exchange.name, content=self.content)
+ def get_routing_key(self):
+ try:
+ return self.content["routing_key"]
+ except KeyError:
+ pass
-def main(host, port):
+ def __str__(self):
+ return self.content.body
+
+class Session(object):
+ def __init__(self, client, csession):
+ self.client = client
+ self.csession = csession
+
+ def open(self):
+ self.csession.open()
+
+ def close(self):
+ self.csession.close()
+
+ def declare(self, object):
+ if object.__class__ is Queue:
+ #XXX blows up without queue=
+ self.csession.queue_declare(queue=object.name)
+ elif object.__class__ is Exchange:
+ self.csession.exchange_declare(exchange=object.name)
+ else:
+ raise Exception()
+
+ def bind(self, queue, exchange, binding_key=None):
+ if binding_key is None:
+ binding_key = queue.name
+
+ self.csession.queue_bind(exchange=exchange.name,
+ queue=queue.name,
+ routing_key=binding_key)
+
+ def publish(self, message, object):
+ if object.__class__ is Exchange:
+ self.csession.message_transfer(destination=object.name,
+ content=message.content)
+ elif object.__class__ is Queue:
+ # XXX maybe this shouldn't be conditional
+ if message.get_routing_key() is None:
+ message.set_routing_key(object.name)
+
+ self.csession.message_transfer(destination="",
+ content=message.content)
+
+def direct_with_explicit_exchange(host, port):
client = Client(host, port)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
- session = client.session()
+ session = Session(client, client.session())
session.open()
- # XXX what all does this do? it seems to declare things
- session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+ try:
+ q = Queue(session, "test")
+ e = Exchange(session, "amq.direct")
+ s = Subscription(session, q, "s")
- exchange = Exchange("amq.direct")
- exchange.declare(session)
+ session.declare(q)
+ session.bind(q, e)
- queue = Queue("test")
- queue.declare(session)
- queue.bind(session, exchange)
+ for i in range(0, 10):
+ print i,
- # XXX this can't go here, because flow stuff barfs
- #session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+ m = Message("Test message " + str(i))
- for n in range(0, 10):
- print n,
- message = Message("Test message " + str(n))
- message.set_routing_key(queue.name)
- message.send(session, exchange)
+ # XXX make this an arg publish, instead?
+ m.set_routing_key(q.name)
- print
+ session.publish(m, e)
- q = client.queue("amq.direct") # XXX huh?
- msg = q.get(timeout=10)
+ print "."
+ for i in range(0, 10):
+ print i,
+
+ m = s.get()
+
+ print m
+ finally:
+ session.close()
+
+def direct_with_implicit_exchange(host, port):
+ client = Client(host, port)
+ client.start({"LOGIN": "guest", "PASSWORD": "guest"})
+
+ # Now, simpler, using the default exchange:
+
+ session = Session(client, client.session())
+ session.open()
+
+ try:
+ q = Queue(session, "test")
+ s = Subscription(session, q, "s")
+
+ session.declare(q)
+
+ for i in range(0, 10):
+ print i,
+ m = Message("m%i" % i)
+ session.publish(m, q)
+ print "Sent", m
+
+ for i in range(0, 10):
+ print i,
+ m = s.get()
+ print "Received", m
+ finally:
+ session.close()
+
if __name__ == "__main__":
if len(sys.argv) != 2:
- print "Usage: qbench IP:PORT"
+ print "Usage: quirk IP:PORT"
sys.exit(2)
addr = sys.argv[1].split(":")
@@ -84,22 +172,4 @@
else:
host, port = (addr[0], 5672)
- main(host, port)
-
-"""
-# XXX what do these do?
-session.message_subscribe(queue="test", destination="amq.direct") # esp what is the destination arg for?
-session.message_flow("amq.direct", 0, 0xFFFFFFFF)
-session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
-for n in range(0, 100):
- msg = Content("hello world " + str(n))
- msg["content_type"] = "text/plain"
- msg["reply_to"] = client.structs.reply_to("asdf", "fdsa") # XXX structs.reply_to garbage ?
- msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
- session.message_transfer(destination="amq.direct", content=msg)
- q = client.queue("amq.direct") #XXX huh?
- msg = q.get(timeout=10)
-
-session.close()
-"""
+ direct_with_implicit_exchange(host, port)
17 years
rhmessaging commits: r1380 - mgmt/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-27 18:52:36 -0500 (Tue, 27 Nov 2007)
New Revision: 1380
Added:
mgmt/bin/quirk
Log:
A preliminary broker client, mostly so I can ask Rafi questions about
the API.
Added: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk (rev 0)
+++ mgmt/bin/quirk 2007-11-27 23:52:36 UTC (rev 1380)
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+import sys, qpid
+from qpid.client import Client
+from qpid.content import Content
+
+class Exchange(object):
+ def __init__(self, name):
+ self.name = name
+
+ def declare(self, session):
+ session.exchange_declare(exchange=self.name)
+ session.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ session.message_flow("amq.direct", 1, 0xFFFFFFFF)
+
+class Queue(object):
+ def __init__(self, name):
+ self.name = name
+
+ def declare(self, session):
+ session.queue_declare(queue=self.name) #XXX blows up without queue=
+
+ def bind(self, session, exchange, binding_key=None):
+ if binding_key is None:
+ binding_key = self.name
+
+ session.queue_bind(exchange=exchange.name, queue=self.name,
+ routing_key=binding_key)
+
+class Message(object):
+ def __init__(self, body=""):
+ self.content = Content(body)
+ self.content["content_type"] = "text/plain"
+
+ def set_content_type(self, type):
+ self.content["content_type"] = type
+
+ def set_routing_key(self, key):
+ self.content["routing_key"] = key # XXX why here and not an arg to message_transfer?
+
+ def send(self, session, exchange):
+ session.message_transfer(destination=exchange.name, content=self.content)
+
+def main(host, port):
+ client = Client(host, port)
+ client.start({"LOGIN": "guest", "PASSWORD": "guest"})
+
+ session = client.session()
+ session.open()
+
+ # XXX what all does this do? it seems to declare things
+ session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+
+ exchange = Exchange("amq.direct")
+ exchange.declare(session)
+
+ queue = Queue("test")
+ queue.declare(session)
+ queue.bind(session, exchange)
+
+ # XXX this can't go here, because flow stuff barfs
+ #session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+
+ for n in range(0, 10):
+ print n,
+ message = Message("Test message " + str(n))
+ message.set_routing_key(queue.name)
+ message.send(session, exchange)
+
+ print
+
+ q = client.queue("amq.direct") # XXX huh?
+ msg = q.get(timeout=10)
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: qbench IP:PORT"
+ sys.exit(2)
+
+ addr = sys.argv[1].split(":")
+
+ if len(addr) > 1:
+ host, port = (addr[0], int(addr[1]))
+ else:
+ host, port = (addr[0], 5672)
+
+ main(host, port)
+
+"""
+# XXX what do these do?
+session.message_subscribe(queue="test", destination="amq.direct") # esp what is the destination arg for?
+session.message_flow("amq.direct", 0, 0xFFFFFFFF)
+session.message_flow("amq.direct", 1, 0xFFFFFFFF)
+
+for n in range(0, 100):
+ msg = Content("hello world " + str(n))
+ msg["content_type"] = "text/plain"
+ msg["reply_to"] = client.structs.reply_to("asdf", "fdsa") # XXX structs.reply_to garbage ?
+ msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
+ session.message_transfer(destination="amq.direct", content=msg)
+ q = client.queue("amq.direct") #XXX huh?
+ msg = q.get(timeout=10)
+
+session.close()
+"""
Property changes on: mgmt/bin/quirk
___________________________________________________________________
Name: svn:executable
+ *
17 years
rhmessaging commits: r1379 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-27 18:31:31 -0500 (Tue, 27 Nov 2007)
New Revision: 1379
Modified:
mgmt/mint/python/mint/__init__.py
Log:
One more little fix for the statsCurr attr rename.
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-27 23:19:28 UTC (rev 1378)
+++ mgmt/mint/python/mint/__init__.py 2007-11-27 23:31:31 UTC (rev 1379)
@@ -113,7 +113,7 @@
d = dict()
if (timestamps[2] != 0):
d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
- d["statsPrev"] = obj.stats
+ d["statsPrev"] = obj.statsCurr
d["statsCurr"] = objStats
obj.set(**d)
self.log("END INST---------------------------------------------------\n")
17 years
rhmessaging commits: r1378 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-27 18:19:28 -0500 (Tue, 27 Nov 2007)
New Revision: 1378
Modified:
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/queue.py
Log:
Update the stat metadata for the renamed statsCurr.
Make more places use the stat metadata instead of directly accessing
it.
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-11-27 22:49:31 UTC (rev 1377)
+++ mgmt/cumin/python/cumin/model.py 2007-11-27 23:19:28 UTC (rev 1378)
@@ -63,7 +63,7 @@
self.cumin_class.add_stat(self)
def value(self, object):
- return nvl(getattr(object.stats, self.name), -1)
+ return nvl(getattr(object.statsCurr, self.name, -1), -1)
def samples(self, object, limit=None):
name = self.cumin_class.name
@@ -82,8 +82,8 @@
return samples
def rate(self, object):
- if object.stats:
- curr = getattr(object.stats, self.name)
+ if object.statsCurr:
+ curr = getattr(object.statsCurr, self.name)
if object.statsPrev:
prev = getattr(object.statsPrev, self.name)
@@ -93,9 +93,9 @@
else:
return -1
else:
- return -1
+ return -2
else:
- return -1
+ return -3
def write_xml(self, object, writer):
writer.write("<stat name=\"%s\" value=\"%i\" rate=\"%i\"/>" \
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-11-27 22:49:31 UTC (rev 1377)
+++ mgmt/cumin/python/cumin/queue.py 2007-11-27 23:19:28 UTC (rev 1378)
@@ -117,7 +117,7 @@
class QueueStatus(CuminStatus):
def render_consumers(self, session, queue):
- return queue.stats.consumers
+ return self.app.cmodel.queue.get_stat("consumers").value(queue)
def render_messages_enqueued(self, session, queue):
stat = self.app.cmodel.queue.get_stat("msgTotalEnqueues")
@@ -128,7 +128,7 @@
return fmt_rate(stat.rate(queue), "msg", "sec")
def render_message_depth(self, session, queue):
- return queue.stats.msgDepth
+ return self.app.cmodel.queue.get_stat("msgDepth").value(queue)
def render_message_depth_accel(self, session, queue):
stat = self.app.cmodel.queue.get_stat("msgDepth")
@@ -143,7 +143,7 @@
return fmt_rate(stat.rate(queue), "byte", "sec")
def render_byte_depth(self, session, queue):
- return queue.stats.byteDepth
+ return self.app.cmodel.queue.get_stat("byteDepth").value(queue)
def render_byte_depth_accel(self, session, queue):
stat = self.app.cmodel.queue.get_stat("byteDepth")
17 years