rhmessaging commits: r2162 - mgmt/basil/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-25 14:26:12 -0400 (Wed, 25 Jun 2008)
New Revision: 2162
Modified:
mgmt/basil/bin/basil
Log:
Point basil at the canonical 0-10 amqp spec
Modified: mgmt/basil/bin/basil
===================================================================
--- mgmt/basil/bin/basil 2008-06-25 17:59:00 UTC (rev 2161)
+++ mgmt/basil/bin/basil 2008-06-25 18:26:12 UTC (rev 2162)
@@ -31,7 +31,7 @@
host, port = addr.split(":")
- spec = os.environ.get("AMQP_SPEC", "/usr/share/…
[View More]amqp/amqp.0-10-preview.xml")
+ spec = os.environ.get("AMQP_SPEC", "/usr/share/amqp/amqp.0-10.xml")
try:
do_main(spec, host, int(port))
[View Less]
16 years, 6 months
rhmessaging commits: r2161 - mgmt/basil/python/basil.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-25 13:59:00 -0400 (Wed, 25 Jun 2008)
New Revision: 2161
Modified:
mgmt/basil/python/basil/__init__.py
Log:
Update some of the naming in basil
Modified: mgmt/basil/python/basil/__init__.py
===================================================================
--- mgmt/basil/python/basil/__init__.py 2008-06-25 09:10:23 UTC (rev 2160)
+++ mgmt/basil/python/basil/__init__.py 2008-06-25 17:59:00 UTC (rev 2161)
@@ -20,33 +20,33 @@
self.lock = Lock()
- …
[View More]def on_schema(self, context_id, class_info,
+ def on_schema(self, broker_id, class_info,
configs, metrics, methods, events):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_schema(context_id, class_info,
+ package.on_schema(broker_id, class_info,
configs, metrics, methods, events)
finally:
self.lock.release()
- def on_config(self, context_id, class_info, values, timestamps):
+ def on_props(self, broker_id, class_info, values, timestamps):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_config(context_id, class_info, values, timestamps)
+ package.on_props(broker_id, class_info, values, timestamps)
finally:
self.lock.release()
- def on_metric(self, context_id, class_info, values, timestamps):
+ def on_stats(self, broker_id, class_info, values, timestamps):
self.lock.acquire()
try:
package = self.get_package(class_info)
- package.on_metric(context_id, class_info, values, timestamps)
+ package.on_stats(broker_id, class_info, values, timestamps)
finally:
self.lock.release()
- def on_callback(self, context_id, seq, status_code, status_text, args):
+ def on_callback(self, broker_id, seq, status_code, status_text, args):
self.lock.acquire()
try:
object, name, callback, kwargs = self.method_calls.pop(seq)
@@ -77,19 +77,19 @@
self.classes = list()
- def on_schema(self, context_id, class_info,
+ def on_schema(self, broker_id, class_info,
configs, metrics, methods, events):
cls = self.get_class(class_info)
- cls.on_schema(context_id, class_info,
+ cls.on_schema(broker_id, class_info,
configs, metrics, methods, events)
- def on_config(self, context_id, class_info, values, timestamps):
+ def on_props(self, broker_id, class_info, values, timestamps):
cls = self.get_class(class_info)
- cls.on_config(context_id, class_info, values, timestamps)
+ cls.on_props(broker_id, class_info, values, timestamps)
- def on_metric(self, context_id, class_info, values, timestamps):
+ def on_stats(self, broker_id, class_info, values, timestamps):
cls = self.get_class(class_info)
- cls.on_metric(context_id, class_info, values, timestamps)
+ cls.on_stats(broker_id, class_info, values, timestamps)
def get_class(self, class_info):
name = class_info[1]
@@ -120,7 +120,7 @@
attrs["basil_class"] = self
self.python_class = type(str(name), (BasilObject,), attrs)
- def on_schema(self, context_id, class_info,
+ def on_schema(self, broker_id, class_info,
configs, metrics, methods, events):
for spec in configs:
setattr(self.python_class, spec[0], None)
@@ -136,19 +136,19 @@
for spec in events:
pass
- def on_config(self, context_id, class_info, values, timestamps):
- object = self.get_object(class_info, context_id, values)
+ def on_props(self, broker_id, class_info, values, timestamps):
+ object = self.get_object(class_info, broker_id, values)
for name, value in values:
setattr(object, name, value)
- def on_metric(self, context_id, class_info, values, timestamps):
- object = self.get_object(class_info, context_id, values)
+ def on_stats(self, broker_id, class_info, values, timestamps):
+ object = self.get_object(class_info, broker_id, values)
for name, value in values:
setattr(object, name, value)
- def get_object(self, class_info, context_id, values):
+ def get_object(self, class_info, broker_id, values):
for name, value in values:
if name == "id":
id = value
@@ -157,11 +157,11 @@
assert id is not None
try:
- object = self.objects_by_composite_id[(context_id, id)]
+ object = self.objects_by_composite_id[(broker_id, id)]
except KeyError:
- object = self.python_class(class_info, context_id, id)
+ object = self.python_class(class_info, broker_id, id)
self.objects.append(object)
- self.objects_by_composite_id[(context_id, id)] = object
+ self.objects_by_composite_id[(broker_id, id)] = object
return object
@@ -169,9 +169,9 @@
return self.name
class BasilObject(object):
- def __init__(self, class_info, context_id, object_id):
+ def __init__(self, class_info, broker_id, object_id):
self.basil_class_info = class_info
- self.basil_context_id = context_id
+ self.basil_broker_id = broker_id
self.basil_object_id = object_id
def basil_call(self, name, callback, **kwargs):
@@ -183,7 +183,7 @@
finally:
model.lock.release()
- conn = model.connections[self.basil_context_id]
+ conn = model.connections[self.basil_broker_id]
client, chan = conn.mclient, conn.chan
model.method_calls[seq] = (self, name, callback, kwargs)
@@ -192,32 +192,32 @@
name, kwargs)
def __repr__(self):
- return "%r-%r" % (self.basil_context_id, self.basil_object_id)
+ return "%r-%r" % (self.basil_broker_id, self.basil_object_id)
class BasilConnection(object):
def __init__(self, model, host, port):
self.model = model
self.host = host
self.port = port
- self.context_id = "%s:%i" % (self.host, self.port)
+ self.broker_id = "%s:%i" % (self.host, self.port)
self.conn = Connection(connect(host, port), self.model.spec)
self.mclient = managementClient(self.model.spec,
None,
- self.model.on_config,
- self.model.on_metric,
+ self.model.on_props,
+ self.model.on_stats,
self.model.on_callback)
self.mclient.schemaListener(self.model.on_schema)
self.chan = None
- self.model.connections[self.context_id] = self
+ self.model.connections[self.broker_id] = self
def open(self):
self.conn.start()
self.chan = self.mclient.addChannel \
- (self.conn.session(str(uuid4())), self.context_id)
+ (self.conn.session(str(uuid4())), self.broker_id)
def close(self):
self.mclient.removeChannel(self.chan)
[View Less]
16 years, 6 months
rhmessaging commits: r2160 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-06-25 05:10:23 -0400 (Wed, 25 Jun 2008)
New Revision: 2160
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Made corresponding const changes to keep inline with changes to qpid trunk.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-24 20:28:59 UTC (rev 2159)
+++ store/trunk/cpp/lib/…
[View More]BdbMessageStore.cpp 2008-06-25 09:10:23 UTC (rev 2160)
@@ -999,7 +999,7 @@
return peek.get_size();
}
-void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
+void BdbMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
checkInit();
u_int64_t messageId (msg->getPersistenceId());
@@ -1029,7 +1029,7 @@
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ const intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
{
checkInit();
u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-24 20:28:59 UTC (rev 2159)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-25 09:10:23 UTC (rev 2160)
@@ -204,9 +204,9 @@
void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+ void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data, u_int64_t offset, u_int32_t length);
void enqueue(qpid::broker::TransactionContext* ctxt,
[View Less]
16 years, 6 months
rhmessaging commits: r2159 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-24 16:28:59 -0400 (Tue, 24 Jun 2008)
New Revision: 2159
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/util.py
Log:
Add a log-level command line option.
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-24 19:44:41 UTC (rev 2158)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-24 20:28:59 UTC (rev 2159)
@@ -144,6 +144,13 @@
…
[View More] return False
class CuminConfig(Config):
+ __log_levels = {
+ "debug": logging.DEBUG,
+ "info": logging.INFO,
+ "warning": logging.WARNING,
+ "error": logging.ERROR
+ }
+
def __init__(self):
super(CuminConfig, self).__init__()
@@ -167,9 +174,13 @@
lpath = os.path.join(self.home, "log", "cumin.log")
summ = ("PATH", "Log to file at PATH")
- self.add_param("log-file", str, lpath, summ)
+ self.add_param("log-file", str, lpath, summ)
- summ = "Enable debug mode"
+ summ = ("LEVEL", "Log messages at or above LEVEL " +
+ "('debug', 'info', 'warning', 'error')")
+ self.add_param("log-level", str, "warning", summ)
+
+ summ = "Enable debug mode; print debug logging to console"
self.add_param("debug", bool, False, summ)
def init(self):
@@ -185,6 +196,24 @@
mlog = logging.getLogger("mint")
+ level = self.get_log_level()
+ formatter = logging.Formatter \
+ ("%(asctime)s %(levelname)-8s %(message)s")
+
+ log.setLevel(level)
+ mlog.setLevel(level)
+
+ try:
+ handler = logging.FileHandler(self.log_file)
+ handler.setLevel(level)
+ handler.setFormatter(formatter)
+
+ log.addHandler(handler)
+ mlog.addHandler(handler)
+ except IOError, e:
+ clog.warn("Can't write to log file '%s': %s" % \
+ (self.log_file, e))
+
if self.debug:
level = logging.DEBUG
formatter = logging.Formatter("%(name)-12s - %(message)s")
@@ -198,21 +227,13 @@
log.addHandler(handler)
mlog.addHandler(handler)
- else:
- level = logging.WARN
- formatter = logging.Formatter \
- ("%(asctime)s %(levelname)-8s %(message)s")
- log.setLevel(level)
- mlog.setLevel(level)
-
- try:
- handler = logging.FileHandler(self.log_file)
- handler.setLevel(level)
- handler.setFormatter(formatter)
-
- log.addHandler(handler)
- mlog.addHandler(handler)
- except IOError, e:
- clog.warn("Can't write to log file '%s': %s" % \
- (self.log_file, e))
+ def get_log_level(self):
+ try:
+ level = self.__log_levels[self.log_level.lower()]
+ except KeyError:
+ log.warn("Level '%s' is unknown; setting level to 'warning'" % \
+ self.log_level)
+ level = logging.WARNING
+
+ return level
Modified: mgmt/cumin/python/cumin/util.py
===================================================================
--- mgmt/cumin/python/cumin/util.py 2008-06-24 19:44:41 UTC (rev 2158)
+++ mgmt/cumin/python/cumin/util.py 2008-06-24 20:28:59 UTC (rev 2159)
@@ -126,7 +126,7 @@
opt = "--%s %s" % (param.name, param.summary[0])
text = param.summary[1]
- main = " %-15s %s" % (opt, text)
+ main = " %-18s %s" % (opt, text)
extra = ""
if param.default not in (None, False):
@@ -134,7 +134,7 @@
if len(main) + len(extra) > 79:
print main
- print " %15s %s" % ("", extra)
+ print " %18s %s" % ("", extra)
else:
print main, extra
[View Less]
16 years, 6 months
rhmessaging commits: r2158 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-24 15:44:41 -0400 (Tue, 24 Jun 2008)
New Revision: 2158
Modified:
mgmt/cumin/python/cumin/__init__.py
Log:
Change the logging defaults:
- Make --debug mode produce DEBUG level output to the console
- Make non debug mode only log to file, at WARN level
- Introduce formatters
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-24 15:56:25 UTC (…
[View More]rev 2157)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-24 19:44:41 UTC (rev 2158)
@@ -173,33 +173,46 @@
self.add_param("debug", bool, False, summ)
def init(self):
- root = logging.getLogger()
-
self.load_defaults()
self.load_args(sys.argv)
- h = logging.StreamHandler()
- root.addHandler(h)
+ handler = logging.StreamHandler()
+ log.addHandler(handler)
self.load_file(os.path.join(self.home, "etc", "cumin.conf"))
self.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
self.load_args(sys.argv)
- root.removeHandler(h)
+ log.removeHandler(handler)
+ mlog = logging.getLogger("mint")
+
if self.debug:
- h = logging.StreamHandler()
- h.setLevel(logging.DEBUG)
- root.addHandler(h)
+ level = logging.DEBUG
+ formatter = logging.Formatter("%(name)-12s - %(message)s")
+
+ log.setLevel(level)
+ mlog.setLevel(level)
- try:
- h = logging.FileHandler(self.log_file)
- except IOError, e:
- root.warn("Can't write to log file '%s': %s" % (self.log_file, e))
+ handler = logging.StreamHandler()
+ handler.setLevel(level)
+ handler.setFormatter(formatter)
- h.setLevel(self.debug and logging.DEBUG or logging.INFO)
- root.addHandler(h)
+ log.addHandler(handler)
+ mlog.addHandler(handler)
+ else:
+ level = logging.WARN
+ formatter = logging.Formatter \
+ ("%(asctime)s %(levelname)-8s %(message)s")
- log.setLevel(self.debug and logging.DEBUG or logging.INFO)
+ log.setLevel(level)
+ mlog.setLevel(level)
- def get_console_handler(self):
- h = logging.StreamHandler()
- return h
+ try:
+ handler = logging.FileHandler(self.log_file)
+ handler.setLevel(level)
+ handler.setFormatter(formatter)
+
+ log.addHandler(handler)
+ mlog.addHandler(handler)
+ except IOError, e:
+ clog.warn("Can't write to log file '%s': %s" % \
+ (self.log_file, e))
[View Less]
16 years, 6 months
rhmessaging commits: r2157 - in mgmt: cumin/python/cumin and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-24 11:56:25 -0400 (Tue, 24 Jun 2008)
New Revision: 2157
Added:
mgmt/mint/python/mint/update.py
Modified:
mgmt/cumin/bin/cumin
mgmt/cumin/bin/cumin-bench
mgmt/cumin/bin/cumin-test
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/model.py
mgmt/mint/bin/mint-test
mgmt/mint/python/mint/__init__.py
Log:
Moved all database updates to a distinct update queue and worker
thread, to simplify locking.
Changed mint-test to work on multiple …
[View More]broker connections.
Now that we have worker threads, introduce app start/stop methods to
the command line entry points.
Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin 2008-06-24 15:56:25 UTC (rev 2157)
@@ -42,11 +42,15 @@
sys.exit(1)
try:
- server.start()
- except:
- server.stop()
- raise
+ app.start()
+ try:
+ server.start()
+ finally:
+ server.stop()
+ finally:
+ app.stop()
+
def main():
config = CuminConfig()
Modified: mgmt/cumin/bin/cumin-bench
===================================================================
--- mgmt/cumin/bin/cumin-bench 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-bench 2008-06-24 15:56:25 UTC (rev 2157)
@@ -26,7 +26,12 @@
harness = BenchmarkHarness(app)
try:
- harness.run(hits)
+ try:
+ app.start()
+
+ harness.run(hits)
+ finally:
+ app.stop()
except KeyboardInterrupt:
pass
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-test 2008-06-24 15:56:25 UTC (rev 2157)
@@ -28,8 +28,12 @@
env = TestEnvironment(app, host, port, config.spec)
env.init();
- session = env.run_test(MainTest(env))
- session.report(sys.stdout)
+ app.start()
+ try:
+ session = env.run_test(MainTest(env))
+ session.report(sys.stdout)
+ finally:
+ app.stop()
def main():
config = CuminConfig()
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -58,8 +58,14 @@
def init(self):
self.model.init()
+
+ def start(self):
+ self.model.start()
self.broker_connect_thread.start()
+ def stop(self):
+ self.model.stop()
+
class BrokerConnectThread(Thread):
log = logging.getLogger("cumin.mgmt.conn")
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/model.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -40,6 +40,12 @@
def init(self):
self.data.init()
+
+ def start(self):
+ self.data.start()
+
+ def stop(self):
+ self.data.stop()
def add_class(self, cls):
self.classes.append(cls)
Modified: mgmt/mint/bin/mint-test
===================================================================
--- mgmt/mint/bin/mint-test 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/bin/mint-test 2008-06-24 15:56:25 UTC (rev 2157)
@@ -1,30 +1,51 @@
#!/usr/bin/env python
-import sys, os
+import sys, os, logging
from time import sleep
from mint import *
def usage():
- print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESS]"
+ print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESSES...]"
print "Example: mint-test postgresql://cumin@localhost/cumin localhost:5672"
sys.exit(1)
-def do_main(uri, spec, host, port):
- model = MintModel(uri, spec, debug=True)
+def do_main(uri, spec, hostports):
+ model = MintModel(uri, spec, debug=False)
model.check()
model.init()
- conn = BrokerConnection(model, host, port)
- conn.open()
+ conns = list()
+ for host, port in hostports:
+ conn = BrokerConnection(model, host, port)
+ conns.append(conn)
+
+ model.start()
try:
- while (True):
- sleep(5)
+ try:
+ for conn in conns:
+ conn.open()
+
+ while True:
+ sleep(5)
+ finally:
+ for conn in conns:
+ try:
+ conn.close()
+ except:
+ pass
finally:
- conn.close()
+ model.stop()
def main():
+ root = logging.getLogger("mint")
+ root.setLevel(logging.DEBUG)
+
+ h = logging.StreamHandler()
+ h.setLevel(logging.DEBUG)
+ root.addHandler(h)
+
if "-h" in sys.argv or "--help" in sys.argv:
usage()
@@ -33,17 +54,29 @@
except IndexError:
uri = "postgresql://cumin@localhost/cumin"
- try:
- addr = sys.argv[2]
- except IndexError:
- addr = "localhost:5672"
+ if uri == "-":
+ uri = "postgresql://cumin@localhost/cumin"
+ addrs = sys.argv[2:]
+
+ if not addrs:
+ addrs = ["localhost:5672"]
+
+ hostports = list()
+
+ for addr in addrs:
+ try:
+ host, port = addr.split(":")
+ hostports.append((host, int(port)))
+ except ValueError, e:
+ print "Error: Cannot parse '%s': %s" % (addr, e)
+ sys.exit(1)
+
spec = os.environ.get("AMQP_SPEC",
- os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
- host, port = addr.split(":")
+ os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
try:
- do_main(uri, spec, host, int(port))
+ do_main(uri, spec, hostports)
except KeyboardInterrupt:
pass
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/python/mint/__init__.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -7,7 +7,9 @@
from sqlobject import *
from threading import Lock, RLock
from traceback import print_exc, print_stack
+
from mint import schema
+from mint import update
log = logging.getLogger("mint")
@@ -117,87 +119,19 @@
lastChallenged = TimestampCol(default=None)
lastLoggedOut = TimestampCol(default=None)
-class OriginalIdDict:
- def __init__(self):
- self.idMap = dict()
- self.lock = Lock()
+class ObjectNotFound(Exception):
+ pass
- def set(self, idOriginal, obj):
- self.lock.acquire()
- try:
- self.idMap[idOriginal] = obj
- finally:
- self.lock.release()
-
- def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- obj = None
-
- self.lock.acquire()
- try:
- obj = self.doGetByOriginalId(objType, idOriginal, managedBroker, create, args)
- finally:
- self.lock.release()
-
- return obj
-
- def doGetByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- obj = None
- key = (managedBroker, idOriginal)
- if (key in self.idMap):
- #print "\n\n=============== %s %d found\n\n" % (objType.__name__, idOriginal)
- obj = self.idMap[key]
- else:
- try:
- obj = eval("objType.selectBy(idOriginal=idOriginal, managedBroker=managedBroker)[:1][0]")
- self.idMap[key] = obj
- except:
- if (create):
- #print "\n\n=============== %s %d NOT found, creating\n\n" % (objType.__name__, idOriginal)
- obj = objType.__new__(objType)
- obj.__init__()
- obj.syncUpdate()
- self.idMap[key] = obj
- else:
- #print "\n\n=============== %s %d NOT found, NOT creating\n\n" % (objType.__name__, idOriginal)
- pass
- else:
- #print "\n\n=============== %s %d found AFTER QUERY\n\n" % (objType.__name__, idOriginal)
- pass
-
- if isinstance(obj, Broker) and obj.managedBroker:
- host, port = obj.managedBroker.split(":")
- port = int(port)
-
- if not obj.registration:
- try:
- reg = BrokerRegistration.selectBy(host=host, port=port)[0]
- except IndexError:
- reg = None
-
- if reg:
- reg.broker = obj
- obj.registration = reg
-
- reg.syncUpdate()
- obj.syncUpdate()
-
- return obj
-
- def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
- ###FIX
- return None
-
# Not thread safe
class BrokerConnection(object):
def __init__(self, model, host, port):
self.model = model
self.host = host
self.port = port
- self.key = "%s:%i" % (host, port)
- self.objs = OriginalIdDict()
+ self.id = "%s:%i" % (host, port)
+ self.objectsById = dict()
- # state in (None, "opening", "opened", "open_failed",
- # "closing", "closed", "close_failed")
+ # state in (None, "opening", "opened", "closing", "closed")
self.state = None
self.exception = None
@@ -205,6 +139,18 @@
self.mclient = None
self.mchan = None
+ def getObject(self, cls, id):
+ try:
+ obj = self.objectsById[id]
+ except KeyError:
+ try:
+ obj = cls.selectBy(idOriginal=id, managedBroker=self.id)[0]
+ self.objectsById[id] = obj
+ except IndexError:
+ raise ObjectNotFound()
+
+ return obj
+
def isOpen(self):
return self.state == "opened"
@@ -220,9 +166,8 @@
try:
sock = connect(self.host, self.port)
except Exception, e:
- self.state = "open_failed"
self.exception = e
- return
+ raise e
self.conn = QpidConnection(sock, spec)
self.mclient = managementClient(spec,
@@ -239,11 +184,11 @@
# XXX I want this to happen after broker start, but the
# callbacks rely on the broker being in the connectedBrokers
# dict
- self.model.connections[self.key] = self
+ self.model.connections[self.id] = self
self.conn.start()
self.mchan = self.mclient.addChannel(self.conn.session(str(uuid4())),
- self.key)
+ self.id)
self.state = "opened"
except Exception, e:
@@ -281,7 +226,7 @@
try:
self.mclient.removeChannel(self.mchan)
- del self.model.connections[self.key]
+ del self.model.connections[self.id]
self.state = "closed"
except Exception, e:
@@ -297,21 +242,6 @@
self.mclient = None
self.mchan = None
- def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- self.model.lock()
- try:
- return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
- finally:
- self.model.unlock()
-
- def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
- create=False, args={}):
- self.model.lock()
- try:
- return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
- finally:
- self.model.unlock()
-
class MintModel:
staticInstance = None
@@ -326,6 +256,8 @@
self.connCloseListener = None
self.__lock = RLock()
+ self.updateThread = update.ModelUpdateThread(self)
+
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -352,6 +284,12 @@
conn = connectionForURI(self.dataUri)
sqlhub.processConnection = conn
+ def start(self):
+ self.updateThread.start()
+
+ def stop(self):
+ self.updateThread.stop()
+
def setDebug(self, debug=True):
self.debug = debug
@@ -359,219 +297,34 @@
if (self.debug):
print message
- def sanitizeDict(self, d):
- if ("id" in d):
- d[self.convertIdKey("id")] = d.pop("id")
- if ("connectionRef" in d):
- d["clientConnectionRef"] = d.pop("connectionRef")
- #XXX FIX -- fix handling of field tables
- if ("arguments" in d):
- d.pop("arguments")
- #XXX FIX -- fix handling of field tables
- return d
-
- def convertIdKey(self, k):
- return "idOriginal"
-
- def convertRefKey(self, k):
- result = k.replace("Ref", "")
- result = result[0].lower() + result[1:]
- return result
-
- def findParentKeys(self, d):
- keys = []
- for key in d.keys():
- if (key.endswith("Ref")):
- keys.append(key)
- return keys
-
- def fixClassInfo(self, classInfo):
- objectName = self.initialCapital(classInfo[1])
- if (objectName == "Connection"):
- objectName = "ClientConnection"
- return objectName
-
- def initialCapital(self, string):
- return string[0].upper() + string[1:]
-
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
- self.log("\nSCHEMA---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
-
+ def getConnection(self, id):
self.lock()
try:
- cls = schema.schemaNameToClassMap.get(objectName)
- if cls:
- cls.classInfos[brokerId] = classInfo
- finally:
- self.unlock()
-
- self.log("\nEND SCHEMA---------------------------------------------------")
-
- def configCallback(self, brokerId, classInfo, list, timestamps):
- self.log("\nCONFIG---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- brokerUUID = classInfo[2]
- self.log(objectName)
- d = self.sanitizeDict(dict(list))
-
- self.lock()
- try:
- conn = self.connections[brokerId]
- finally:
- self.unlock()
-
- d["managedBroker"] = brokerId
- d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
- self.log(d)
- try:
- for parentKey in self.findParentKeys(d):
- convertedKey = self.convertRefKey(parentKey)
-
- self.lock()
- try:
- cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
- if cls:
- d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
- else:
- self.log("Error: referenced class not found: %s" % convertedKey)
- finally:
- self.unlock()
-
- obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
- if (not obj):
- self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
- return
-
- self.lock()
try:
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
- finally:
- self.unlock()
-
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- return
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return
-
- self.log("END CONFIG---------------------------------------------------\n")
- return obj
-
- def instCallback(self, brokerId, classInfo, list, timestamps):
- self.log("\nINST---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- brokerUUID = classInfo[2]
- self.log(objectName)
- d = self.sanitizeDict(dict(list))
-
- self.lock()
- try:
- conn = self.connections[brokerId]
+ return self.connections[id]
+ except KeyError:
+ log.error("Connection '%s' not found" % id)
finally:
self.unlock()
- d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- self.log(d)
+ def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
+ conn = self.getConnection(brokerId)
+ up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+ self.updateThread.enqueue(up)
- try:
- obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")], brokerId)
- if (not obj):
- self.log("Couldn't find type %s id %s" % (objectName, d[self.convertIdKey("id")]))
- print "lion", classInfo, list
- return
-
- origObjName = objectName[0].lower() + objectName[1:]
- d[origObjName] = obj
+ def configCallback(self, brokerId, classInfo, props, timestamps):
+ conn = self.getConnection(brokerId)
+ up = update.PropertyUpdate(conn, classInfo, props, timestamps)
+ self.updateThread.enqueue(up)
- self.lock()
- try:
- objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
- objStats = objNameStats.__new__(objNameStats)
- objStats.__init__()
- finally:
- self.unlock()
-
- if (not objStats):
- self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
- return
+ def instCallback(self, brokerId, classInfo, stats, timestamps):
+ conn = self.getConnection(brokerId)
+ up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
+ self.updateThread.enqueue(up)
- self.lock()
- try:
- objStats = self.updateObjWithDict(objStats, d)
- if (not objStats):
- return
- finally:
- self.unlock()
-
- d = dict()
- d["statsPrev"] = obj.statsCurr
- d["statsCurr"] = objStats
- if (timestamps[2] != 0):
- d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
-
- self.lock()
- try:
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
- finally:
- self.unlock()
-
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- return
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return
-
- self.log("END INST---------------------------------------------------\n")
- return objStats
-
- def updateObjWithDict(self, obj, d):
- updateDone = False
- reattemptCount = 0
- while not updateDone and len(d) > 0:
- try:
- obj.set(**d)
- obj.syncUpdate()
- updateDone = True
- if (reattemptCount > 0):
- self.log("Reattempts successful")
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- detailString = detail.__str__()
- errorString = "got an unexpected keyword argument "
- index = detailString.index(errorString)
- if (index >= 0):
- # argument in dict is not in schema, so remove it and re-attempt
- index += len(errorString)
- missingAttrib = detailString[index:]
- self.log("Reattempting without %s attribute" % missingAttrib)
- d.pop(missingAttrib)
- reattemptCount += 1
- else:
- # can't recover
- self.log("Non-recoverable schema mismatch, information lost")
- return None
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return None
- except:
- #TODO: better exception handling here
- self.log("Unexpected Error: %s" % sys.exc_info()[0])
- print "Unexpected Error: %s" % sys.exc_info()[0]
- return obj
- return obj
-
def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
self.log("\nMETHOD---------------------------------------------------")
self.log("MethodId=%d" % (methodId))
Added: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py (rev 0)
+++ mgmt/mint/python/mint/update.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -0,0 +1,246 @@
+import logging
+
+from Queue import Queue as ConcurrentQueue, Full, Empty
+from threading import Thread
+from datetime import datetime
+
+import mint
+
+log = logging.getLogger("mint.update")
+
+class ModelUpdateThread(Thread):
+ def __init__(self, model):
+ super(ModelUpdateThread, self).__init__()
+
+ self.model = model
+ self.updates = ConcurrentQueue()
+ self.stopRequested = False
+ self.setDaemon(False)
+
+ def enqueue(self, update):
+ try:
+ self.updates.put(update)
+ except Full:
+ log.warn("Update queue is full")
+
+ def run(self):
+ while True:
+ try:
+ update = self.updates.get(True, 1)
+ except Empty:
+ if self.stopRequested:
+ break
+ else:
+ log.info("Update queue is empty")
+ continue
+
+ try:
+ update.process()
+ except Exception, e:
+ log.error(e)
+
+ def stop(self):
+ self.stopRequested = True
+
+class UnknownClassException(Exception):
+ pass
+
+def unmarshalClassInfo(classInfo):
+ package = classInfo[0]
+ name = classInfo[1].capitalize()
+
+ if name == "Connection":
+ name = "ClientConnection"
+
+ cls = getattr(mint, name)
+
+ if cls is None:
+ raise UnknownClassException("Class '%s' is unknown" % name)
+
+ return package, cls
+
+def processAttributes(conn, attrs, cls):
+ if "id" in attrs:
+ attrs["idOriginal"] = attrs.pop("id")
+
+ if "connectionRef" in attrs:
+ attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+
+ #XXX FIX -- fix handling of field tables
+ if "arguments" in attrs:
+ del attrs["arguments"]
+
+ for name in attrs.keys():
+ if len(name) > 3 and name.endswith("Ref"):
+ # Navigate to referenced objects
+
+ clsname = name[0].upper() + name[1:-3]
+ id = attrs.pop(name)
+
+ othercls = getattr(mint, clsname, None)
+
+ if othercls:
+ attrname = name[0:-3]
+
+ try:
+ attrs[attrname] = conn.getObject(othercls, id)
+ except KeyError:
+ log.info("Referenced object %s '%s' not found" % \
+ (clsname, id))
+ else:
+ log.error("Class '%s' not found" % clsname)
+ elif not hasattr(cls, name):
+ # Remove attrs that we don't have in our schema
+
+ log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+
+ del attrs[name]
+
+ return attrs
+
+def getStatsClass(cls):
+ return getattr(mint, cls.__name__ + "Stats")
+
+class SchemaUpdate(object):
+ def __init__(self, conn, classInfo, props, stats, methods, events):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.props = props
+ self.stats = stats
+ self.methods = methods
+ self.events = events
+
+ def process(self):
+ args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
+ log.info("%-8s %-16s %-8s %-12s" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ if cls:
+ cls.classInfos[self.conn.id] = self.classInfo
+
+class PropertyUpdate(object):
+ def __init__(self, conn, classInfo, props, timestamps):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.props = props
+ self.timestamps = timestamps
+
+ def process(self):
+ args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
+ len(self.props))
+ log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ attrs = dict(self.props)
+
+ id = attrs["id"]
+
+ processAttributes(self.conn, attrs, cls)
+
+ attrs["managedBroker"] = self.conn.id
+ attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+ attrs["creationTime"] = datetime.fromtimestamp \
+ (self.timestamps[1]/1000000000)
+
+ try:
+ obj = self.conn.getObject(cls, id)
+ except mint.ObjectNotFound:
+ obj = cls()
+ obj.set(**attrs)
+ obj.syncUpdate()
+
+ # XXX refactor this to take advantage of the get/create logic
+ # above
+ if isinstance(obj, mint.Broker) and obj.managedBroker:
+ host, port = obj.managedBroker.split(":")
+ port = int(port)
+
+ if not obj.registration:
+ try:
+ reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
+ except IndexError:
+ reg = None
+
+ if reg:
+ reg.broker = obj
+ obj.registration = reg
+
+ reg.syncUpdate()
+ obj.syncUpdate()
+
+class StatisticUpdate(object):
+ def __init__(self, conn, classInfo, stats, timestamps):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.stats = stats
+ self.timestamps = timestamps
+
+ def process(self):
+ args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
+ len(self.stats))
+ log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ attrs = dict(self.stats)
+
+ id = attrs["id"]
+
+ try:
+ obj = self.conn.getObject(cls, id)
+ except ObjectNotFound, e:
+ log.warn(e)
+ return
+
+ statscls = getStatsClass(cls)
+ processAttributes(self.conn, attrs, statscls)
+
+ attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+
+ # Set the stat->obj reference
+ attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+
+ statsobj = statscls()
+ statsobj.set(**attrs)
+ statsobj.syncUpdate()
+
+ if self.timestamps[2] != 0:
+ obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+
+ obj.statsPrev = obj.statsCurr
+ obj.statsCurr = statsobj
+ obj.syncUpdate()
+
+class MethodUpdate(object):
+ def __init__(self, conn, methodId, errorId, errorText, args):
+ self.conn = conn
+ self.methodId = methodId
+ self.errorId = errorId
+ self.errorText = errorText
+ self.args = args
+
+ def process(self):
+ args = ("stats", self.conn.id, self.methodId, self.errorId,
+ self.errorText)
+ log.info("%-8s %-16s %-8s %-8s %s" % args)
+
+class CloseUpdate(object):
+ def __init__(self, brokerId, data):
+ self.brokerId = brokerId
+
+ def process(self):
+ log.info("%-8s %-16s" % ("close", self.brokerId))
[View Less]
16 years, 6 months
rhmessaging commits: r2156 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-06-23 15:44:16 -0400 (Mon, 23 Jun 2008)
New Revision: 2156
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/DataTokenImpl.h
Log:
Const-correctness fixes in MessageStore.h
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/…
[View More]BdbMessageStore.cpp 2008-06-23 19:44:16 UTC (rev 2156)
@@ -109,20 +109,20 @@
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wCachePageSize)
{
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
+ break;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
+ break;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
useAsync = async;
@@ -138,8 +138,8 @@
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
- "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
- "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
+ "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
+ "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
THROW_STORE_EXCEPTION_2("Error opening environment", e);
}
@@ -206,40 +206,40 @@
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0)
- {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- }
- else
- {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
+ if (oldJrnlWrCachePageSize == 0)
+ {
+ // For zero value, use default
+ jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
+ }
+ else
+ {
+ // For any positive value, use closest value
+ if (oldJrnlWrCachePageSize < 6)
+ jrnlWrCachePageSize = 4;
+ else if (oldJrnlWrCachePageSize < 12)
+ jrnlWrCachePageSize = 8;
+ else if (oldJrnlWrCachePageSize < 24)
+ jrnlWrCachePageSize = 16;
+ else if (oldJrnlWrCachePageSize < 48)
+ jrnlWrCachePageSize = 32;
+ else if (oldJrnlWrCachePageSize < 96)
+ jrnlWrCachePageSize = 64;
+ else if (oldJrnlWrCachePageSize > 128)
+ jrnlWrCachePageSize = 128;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
+ }
}
return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
@@ -740,15 +740,15 @@
break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
- ::usleep(AIO_SLEEP_TIME);
- break;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ read = false;
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
default:
- assert( "Store Error: Unexpected msg state");
+ assert( "Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
@@ -939,7 +939,7 @@
readXids(prepareXidDb, xids);
}
-void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
+void BdbMessageStore::stage(const intrusive_ptr<PersistableMessage>& msg)
{
checkInit();
TxnCtxt txn;
@@ -1092,7 +1092,8 @@
}
}
-void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::enqueue(TransactionContext* ctxt,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1146,7 +1147,7 @@
void BdbMessageStore::store(const PersistableQueue* queue,
TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
+ const intrusive_ptr<PersistableMessage>& message,
bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
@@ -1165,9 +1166,9 @@
if ( queue && usingJrnl()) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
+ dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()){
@@ -1198,7 +1199,8 @@
}
}
-void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+void BdbMessageStore::dequeue(TransactionContext* ctxt,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
checkInit();
@@ -1256,7 +1258,7 @@
void BdbMessageStore::async_dequeue(
TransactionContext* ctxt,
- intrusive_ptr<PersistableMessage>& msg,
+ const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-06-23 19:44:16 UTC (rev 2156)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _BdbMessageStore_
@@ -52,189 +52,189 @@
#endif
namespace rhm {
- namespace bdbstore {
- using std::string;
+namespace bdbstore {
+using std::string;
- /**
- * An implementation of the MessageStore interface based on Berkeley DB
- */
- class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
- {
- typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
- typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
+/**
+ * An implementation of the MessageStore interface based on Berkeley DB
+ */
+class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
+{
+ typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
+ typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
- typedef LockedMappings::map txn_lock_map;
- typedef boost::ptr_list<PreparedTransaction> txn_list;
+ typedef LockedMappings::map txn_lock_map;
+ typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Default store settings
- static const bool defUseAsync = false;
- static const bool defForceStoreConversion = false;
- static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
- static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+ // Default store settings
+ static const bool defUseAsync = false;
+ static const bool defForceStoreConversion = false;
+ static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
+ static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
- std::list<Db*> dbs;
- DbEnv env;
- Db queueDb;
- Db configDb;
- Db exchangeDb;
- Db messageDb;
- Db mappingDb;
- Db bindingDb;
- Db generalDb;
- Db enqueueXidDb;
- Db dequeueXidDb;
- Db prepareXidDb;
- IdSequence queueIdSequence;
- IdSequence exchangeIdSequence;
- IdSequence generalIdSequence;
- IdSequence messageIdSequence;
- static bool useAsync;
- std::string storeDir;
- u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
- bool isInit;
- const char* envPath;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
- qpid::management::Store::shared_ptr mgmtObject;
- qpid::sys::Mutex jrnlCreateLock;
+ std::list<Db*> dbs;
+ DbEnv env;
+ Db queueDb;
+ Db configDb;
+ Db exchangeDb;
+ Db messageDb;
+ Db mappingDb;
+ Db bindingDb;
+ Db generalDb;
+ Db enqueueXidDb;
+ Db dequeueXidDb;
+ Db prepareXidDb;
+ IdSequence queueIdSequence;
+ IdSequence exchangeIdSequence;
+ IdSequence generalIdSequence;
+ IdSequence messageIdSequence;
+ static bool useAsync;
+ std::string storeDir;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wcache_pgsize_sblks;
+ u_int16_t wcache_num_pages;
+ bool isInit;
+ const char* envPath;
+ static qpid::sys::Duration defJournalGetEventsTimeout;
+ static qpid::sys::Duration defJournalFlushTimeout;
+ qpid::management::Store::shared_ptr mgmtObject;
+ qpid::sys::Mutex jrnlCreateLock;
- bool mode(const bool mode, const bool force);
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& messages);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
- void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
- void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked, message_index& prepared);
- void recoverXids(txn_list& txns);
- void readXids(Db& db, std::set<string>& xids);
- void readLockedMappings(Db& db, txn_lock_map& mappings);
- TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
- bool isUnused(Cursor& cursor, Dbt& messageId);
- void destroy(Db& db, const qpid::broker::Persistable& p);
- bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
- void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
- void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key);
+ bool mode(const bool mode, const bool force);
+ void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
+ txn_list& locked, message_index& messages);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
+ txn_list& locked, message_index& prepared);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked, message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
+ void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index, txn_list& locked, message_index& prepared);
+ void recoverXids(txn_list& txns);
+ void readXids(Db& db, std::set<string>& xids);
+ void readLockedMappings(Db& db, txn_lock_map& mappings);
+ TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+ void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
+ Dbt& messageId,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
+ bool isUnused(Cursor& cursor, Dbt& messageId);
+ void destroy(Db& db, const qpid::broker::Persistable& p);
+ bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
+ void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
+ void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key);
- u_int64_t getRecordSize(Db& db, Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
- void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
+ u_int64_t getRecordSize(Db& db, Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
+ void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
+ bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
+ void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
- // journal functions
- void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return useAsync;}
- string getJrnlBaseDir();
- inline void checkInit() {
- if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
- }
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ string getJrnlDir(const char* queueName);
+ static inline bool usingJrnl() {return useAsync;}
+ string getJrnlBaseDir();
+ inline void checkInit() {
+ if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ }
- public:
- struct Options : public qpid::Options {
- Options(const std::string& name="Store Options");
- string clusterName;
- string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
- };
+ public:
+ struct Options : public qpid::Options {
+ Options(const std::string& name="Store Options");
+ string clusterName;
+ string storeDir;
+ bool storeAsync;
+ bool storeForce;
+ uint16_t numJrnlFiles;
+ uint32_t jrnlFsizePgs;
+ uint32_t wCachePageSize;
+ };
- typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
+ typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
- BdbMessageStore(const char* envpath = 0);
- virtual ~BdbMessageStore();
- bool init(const qpid::Options* options);
- bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
- void initManagement (qpid::broker::Broker* broker);
+ BdbMessageStore(const char* envpath = 0);
+ virtual ~BdbMessageStore();
+ bool init(const qpid::Options* options);
+ bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+ void initManagement (qpid::broker::Broker* broker);
- void truncate();
+ void truncate();
- void create(qpid::broker::PersistableQueue& queue,
- const qpid::framing::FieldTable& args);
- void destroy(qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
+ void destroy(qpid::broker::PersistableQueue& queue);
- void create(const qpid::broker::PersistableExchange& queue,
- const qpid::framing::FieldTable& args);
- void destroy(const qpid::broker::PersistableExchange& queue);
+ void create(const qpid::broker::PersistableExchange& queue,
+ const qpid::framing::FieldTable& args);
+ void destroy(const qpid::broker::PersistableExchange& queue);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable& args);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable& args);
- void create(const qpid::broker::PersistableConfig& config);
- void destroy(const qpid::broker::PersistableConfig& config);
+ void create(const qpid::broker::PersistableConfig& config);
+ void destroy(const qpid::broker::PersistableConfig& config);
- void recover(qpid::broker::RecoveryManager& queues);
+ void recover(qpid::broker::RecoveryManager& queues);
- void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
- void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
- void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void destroy(qpid::broker::PersistableMessage& msg);
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data, u_int64_t offset, u_int32_t length);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void flush(const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ void flush(const qpid::broker::PersistableQueue& queue);
- u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
+ u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<string>& xids);
+ void collectPreparedXids(std::set<string>& xids);
- std::auto_ptr<qpid::broker::TransactionContext> begin();
- std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
- void prepare(qpid::broker::TPCTransactionContext& ctxt);
- void commit(qpid::broker::TransactionContext& ctxt);
- void abort(qpid::broker::TransactionContext& ctxt);
+ std::auto_ptr<qpid::broker::TransactionContext> begin();
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+ void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ void commit(qpid::broker::TransactionContext& ctxt);
+ void abort(qpid::broker::TransactionContext& ctxt);
- qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
- { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
- { return qpid::management::Manageable::STATUS_OK; }
- };
- }
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ { return qpid::management::Manageable::STATUS_OK; }
+};
}
+}
#endif
Modified: store/trunk/cpp/lib/DataTokenImpl.h
===================================================================
--- store/trunk/cpp/lib/DataTokenImpl.h 2008-06-17 03:47:04 UTC (rev 2155)
+++ store/trunk/cpp/lib/DataTokenImpl.h 2008-06-23 19:44:16 UTC (rev 2156)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _DataTokenImpl_
@@ -29,23 +29,23 @@
#include <qpid/broker/PersistableMessage.h>
namespace rhm {
- namespace bdbstore {
+namespace bdbstore {
- class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
- {
- private:
- boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
- public:
- DataTokenImpl();
- virtual ~DataTokenImpl();
+class DataTokenImpl : public journal::data_tok, public qpid::RefCounted
+{
+ private:
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> sourceMsg;
+ public:
+ DataTokenImpl();
+ virtual ~DataTokenImpl();
- inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
- { return sourceMsg; }
- inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
- { sourceMsg = msg; }
- };
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+ { return sourceMsg; }
+ inline void setSourceMessage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+ { sourceMsg = msg; }
+};
- } // namespace bdbstore
- } // namespace rhm
+} // namespace bdbstore
+} // namespace rhm
#endif
[View Less]
16 years, 6 months
rhmessaging commits: r2155 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-16 23:47:04 -0400 (Mon, 16 Jun 2008)
New Revision: 2155
Modified:
mgmt/mint/python/mint/__init__.py
Log:
Protect shared model data
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-17 02:27:33 UTC (rev 2154)
+++ mgmt/mint/python/mint/__init__.py 2008-06-17 03:47:04 UTC (rev 2155)
@@ -5,8 +5,8 @@
from qpid.management import managementChannel, …
[View More]managementClient
from datetime import *
from sqlobject import *
-from threading import Lock
-from traceback import print_exc
+from threading import Lock, RLock
+from traceback import print_exc, print_stack
from mint import schema
log = logging.getLogger("mint")
@@ -233,7 +233,7 @@
self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
- self.model.lock.acquire()
+ self.model.lock()
try:
try:
# XXX I want this to happen after broker start, but the
@@ -250,7 +250,7 @@
self.exception = e
raise e
finally:
- self.model.lock.release()
+ self.model.unlock()
def getSessionId(self):
if not self.isOpen():
@@ -262,13 +262,13 @@
if not self.isOpen():
raise Exception("Connection not open")
- self.model.lock.acquire()
+ self.model.lock()
try:
self.model.currentMethodId += 1
seq = self.model.currentMethodId
self.model.outstandingMethodCalls[seq] = callback
finally:
- self.model.lock.release()
+ self.model.unlock()
self.mclient.callMethod(self.mchan, seq, objId,
className, methodName, args)
@@ -276,7 +276,7 @@
def close(self):
self.state = "closing"
- self.model.lock.acquire()
+ self.model.lock()
try:
try:
self.mclient.removeChannel(self.mchan)
@@ -288,7 +288,7 @@
self.exception = e
raise e
finally:
- self.model.lock.release()
+ self.model.unlock()
self.conn.close()
# XXX What else do I need to try to shutdown here?
@@ -298,23 +298,19 @@
self.mchan = None
def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- result = None
- self.model.lock.acquire()
+ self.model.lock()
try:
- result = self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
+ return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
finally:
- self.model.lock.release()
- return result
+ self.model.unlock()
def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
create=False, args={}):
- result = None
- self.model.lock.acquire()
+ self.model.lock()
try:
- result = self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
+ return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
finally:
- self.model.lock.release()
- return result
+ self.model.unlock()
class MintModel:
staticInstance = None
@@ -328,7 +324,7 @@
self.outstandingMethodCalls = dict()
self.connections = dict()
self.connCloseListener = None
- self.lock = Lock()
+ self.__lock = RLock()
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -336,6 +332,13 @@
if self.debug:
log.setLevel(logging.DEBUG)
+ def lock(self):
+ #print_stack()
+ self.__lock.acquire()
+
+ def unlock(self):
+ self.__lock.release()
+
def check(self):
try:
connectionForURI(self.dataUri)
@@ -398,9 +401,15 @@
self.log("\nSCHEMA---------------------------------------------------")
objectName = self.fixClassInfo(classInfo)
self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
- cls = schema.schemaNameToClassMap.get(objectName)
- if cls:
- cls.classInfos[brokerId] = classInfo
+
+ self.lock()
+ try:
+ cls = schema.schemaNameToClassMap.get(objectName)
+ if cls:
+ cls.classInfos[brokerId] = classInfo
+ finally:
+ self.unlock()
+
self.log("\nEND SCHEMA---------------------------------------------------")
def configCallback(self, brokerId, classInfo, list, timestamps):
@@ -410,11 +419,11 @@
self.log(objectName)
d = self.sanitizeDict(dict(list))
- self.lock.acquire()
+ self.lock()
try:
conn = self.connections[brokerId]
finally:
- self.lock.release()
+ self.unlock()
d["managedBroker"] = brokerId
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
@@ -423,20 +432,29 @@
try:
for parentKey in self.findParentKeys(d):
convertedKey = self.convertRefKey(parentKey)
- cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
- if cls:
- d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
- else:
- self.log("Error: referenced class not found: %s" % convertedKey)
+ self.lock()
+ try:
+ cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
+ if cls:
+ d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
+ else:
+ self.log("Error: referenced class not found: %s" % convertedKey)
+ finally:
+ self.unlock()
+
obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
if (not obj):
self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
return
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
+ self.lock()
+ try:
+ obj = self.updateObjWithDict(obj, d)
+ if (not obj):
+ return
+ finally:
+ self.unlock()
except TypeError, detail:
self.log("TypeError: Schema mismatch: %s" % detail)
@@ -455,11 +473,11 @@
self.log(objectName)
d = self.sanitizeDict(dict(list))
- self.lock.acquire()
+ self.lock()
try:
conn = self.connections[brokerId]
finally:
- self.lock.release()
+ self.unlock()
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
self.log(d)
@@ -473,27 +491,40 @@
origObjName = objectName[0].lower() + objectName[1:]
d[origObjName] = obj
- objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
- objStats = objNameStats.__new__(objNameStats)
- objStats.__init__()
+
+ self.lock()
+ try:
+ objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
+ objStats = objNameStats.__new__(objNameStats)
+ objStats.__init__()
+ finally:
+ self.unlock()
if (not objStats):
self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
return
-
- objStats = self.updateObjWithDict(objStats, d)
- if (not objStats):
- return
+ self.lock()
+ try:
+ objStats = self.updateObjWithDict(objStats, d)
+ if (not objStats):
+ return
+ finally:
+ self.unlock()
+
d = dict()
d["statsPrev"] = obj.statsCurr
d["statsCurr"] = objStats
if (timestamps[2] != 0):
d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
+ self.lock()
+ try:
+ obj = self.updateObjWithDict(obj, d)
+ if (not obj):
+ return
+ finally:
+ self.unlock()
except TypeError, detail:
self.log("TypeError: Schema mismatch: %s" % detail)
@@ -547,8 +578,14 @@
self.log("Error: %d %s" % (errorNo, errorText))
self.log("Args: ")
self.log(args)
- method = self.outstandingMethodCalls.pop(methodId)
- result = method(errorText, args)
+
+ self.lock()
+ try:
+ method = self.outstandingMethodCalls.pop(methodId)
+ result = method(errorText, args)
+ finally:
+ self.unlock()
+
self.log("END METHOD---------------------------------------------------\n")
return result
@@ -556,14 +593,15 @@
self.log("\nCLOSE---------------------------------------------------")
self.log("BrokerId=%s , Data=%s" % (brokerId, data))
- self.lock.acquire()
+ self.lock()
try:
del self.connections[brokerId]
+
+ if (self.connCloseListener != None):
+ self.connCloseListener(brokerId, data)
finally:
- self.lock.release()
+ self.unlock()
- if (self.connCloseListener != None):
- self.connCloseListener(brokerId, data)
self.log("END CLOSE---------------------------------------------------\n")
return
@@ -583,19 +621,21 @@
return
def registerCallback(self, callback):
- self.currentMethodId += 1
- methodId = self.currentMethodId
- self.outstandingMethodCalls[methodId] = callback
- return methodId
+ self.lock()
+ try:
+ self.currentMethodId += 1
+ methodId = self.currentMethodId
+ self.outstandingMethodCalls[methodId] = callback
+ return methodId
+ finally:
+ self.unlock()
def getConnectionByRegistration(self, reg):
- result = None
- self.lock.acquire()
+ self.lock()
try:
- result = self.connections.get("%s:%i" % (reg.host, reg.port))
+ return self.connections.get("%s:%i" % (reg.host, reg.port))
finally:
- self.lock.release()
- return result
+ self.unlock()
class MintDatabase(object):
def __init__(self, uri):
[View Less]
16 years, 6 months
rhmessaging commits: r2154 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-16 22:27:33 -0400 (Mon, 16 Jun 2008)
New Revision: 2154
Modified:
mgmt/cumin/python/cumin/stat.py
Log:
Protect against nulls
Modified: mgmt/cumin/python/cumin/stat.py
===================================================================
--- mgmt/cumin/python/cumin/stat.py 2008-06-16 23:03:37 UTC (rev 2153)
+++ mgmt/cumin/python/cumin/stat.py 2008-06-17 02:27:33 UTC (rev 2154)
@@ -211,8 +211,8 @@
for stat in stats:
vals = values[stat]
…
[View More] if vals:
- max_value = max(max(vals), max_value)
- min_value = min(min(vals), min_value)
+ max_value = max(nvl(max(vals), 0), max_value)
+ min_value = min(nvl(min(vals), 0), min_value)
max_value = round(max_value * 1.1 + 1)
[View Less]
16 years, 6 months
rhmessaging commits: r2153 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-06-16 19:03:37 -0400 (Mon, 16 Jun 2008)
New Revision: 2153
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Fix for BZ "451432: jexception 0x0b01 txn_map::set_aio_compl() threw JERR_MAP_NOTFOUND: Key not found in map". Reduced sleep time while waiting for AIO retruns from 10ms to 1ms, which improves txn performance. Also set aio_complete flags for recovered transactions.
Modified: store/trunk/…
[View More]cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-13 16:00:39 UTC (rev 2152)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-06-16 23:03:37 UTC (rev 2153)
@@ -215,8 +215,8 @@
log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
}
-#define MAX_AIO_SLEEPS 500
-#define AIO_SLEEP_TIME 1000000
+#define MAX_AIO_SLEEPS 1000 // 10 sec
+#define AIO_SLEEP_TIME 10000 // 10 ms
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length)
{
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-06-13 16:00:39 UTC (rev 2152)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-06-16 23:03:37 UTC (rev 2153)
@@ -65,6 +65,7 @@
DbTxn* txn;
void completeTXN(bool commit){
+ sync();
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
@@ -73,12 +74,12 @@
dtokp->set_external_rid(true);
dtokp->set_rid(loggedtx->next());
try{
- if (commit)
+ if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- else
- {
- jc->txn_abort(dtokp.get(), getXid());
- }
+ jc->flush(true);
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
+ }
} catch (const journal::jexception& e) {
//std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
@@ -86,7 +87,6 @@
}
}
- sync();
deleteXidRecord();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-06-13 16:00:39 UTC (rev 2152)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-06-16 23:03:37 UTC (rev 2153)
@@ -510,8 +510,8 @@
ji.write();
}
-#define AIO_CMPL_SLEEP 10000 // 10 ms
-#define MAX_AIO_CMPL_SLEEPS 1000 // Total: 10 sec
+#define AIO_CMPL_SLEEP 1000 // 1 ms
+#define MAX_AIO_CMPL_SLEEPS 10000 // Total: 10 sec
void
jcntl::aio_cmpl_wait()
@@ -687,6 +687,7 @@
assert(xidp != 0);
std::string xid((char*)xidp, er.xid_size());
_tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ _tmap.set_aio_compl(xid, h._rid);
std::free(xidp);
}
else
@@ -711,6 +712,7 @@
assert(xidp != 0);
std::string xid((char*)xidp, dr.xid_size());
_tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ _tmap.set_aio_compl(xid, dr.rid());
std::free(xidp);
}
else
[View Less]
16 years, 6 months