rhmessaging commits: r1446 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-10 09:57:57 -0500 (Mon, 10 Dec 2007)
New Revision: 1446
Modified:
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/queue.strings
Log:
Cosmetic fixups.
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-12-10 01:46:31 UTC (rev 1445)
+++ mgmt/cumin/python/cumin/model.py 2007-12-10 14:57:57 UTC (rev 1446)
@@ -93,11 +93,11 @@
if curr is not None and prev is not None:
return (curr - prev) / float(1)
else:
- return -1
+ return 0
else:
- return -2
+ return 0
else:
- return -3
+ return 0
def write_xml(self, object, writer):
writer.write("<stat name=\"%s\" value=\"%i\" rate=\"%i\"/>" \
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2007-12-10 01:46:31 UTC (rev 1445)
+++ mgmt/cumin/python/cumin/queue.strings 2007-12-10 14:57:57 UTC (rev 1446)
@@ -6,7 +6,7 @@
}
[QueueSet.html]
-<form action="{href}" method="get">
+<!-- <form action="{href}" method="get"> -->
<div class="rfloat">{page}</div>
{unit}
@@ -30,7 +30,7 @@
{items}
</table>
-</form>
+<!-- </form> -->
[QueueSet.item_html]
<tr>
18 years, 4 months
rhmessaging commits: r1445 - in mgmt: cumin/python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-09 20:46:31 -0500 (Sun, 09 Dec 2007)
New Revision: 1445
Modified:
mgmt/bin/quirk
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/broker.strings
mgmt/cumin/python/cumin/brokercluster.py
mgmt/cumin/python/cumin/brokercluster.strings
mgmt/cumin/python/cumin/brokergroup.py
mgmt/cumin/python/cumin/brokergroup.strings
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/exchange.strings
mgmt/cumin/python/cumin/page.strings
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/queue.strings
mgmt/cumin/python/cumin/widgets.py
Log:
Enhances the quirk amqp client to generate some kinds of demo data.
Adds paginators to broker, queue, and exchange views.
Simplifies the broker group list.
Changes the way the Paginator gets its total item count set.
Modified: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/bin/quirk 2007-12-10 01:46:31 UTC (rev 1445)
@@ -1,177 +1,236 @@
#!/usr/bin/env python
-import sys, qpid
+import sys, qpid, time
class Exchange(object):
def __init__(self, session, name):
self.session = session
self.name = name
+ self.type = "direct"
+ def declare(self):
+ self.session.psession.exchange_declare(exchange=self.name,
+ type=self.type)
+
+ def exists(self):
+ return False #XXX figure out how to do this
+
class Queue(object):
def __init__(self, session, name):
self.session = session
self.name = name
+ def declare(self):
+ self.session.psession.queue_declare(queue=self.name)
+
+ def exists(self):
+ return False #XXX figure out how to do this
+
+ def bind(self, exchange, binding_key=None):
+ if binding_key is None:
+ binding_key = self.name
+
+ self.session.psession.queue_bind(exchange=exchange.name,
+ queue=self.name,
+ routing_key=binding_key)
+
class Subscription(object):
- def __init__(self, session, name, queue):
+ def __init__(self, session, queue):
self.session = session
- self.name = name
+ self.name = queue.name # XXX bad?
self.queue = queue
- # XXX what all does this do? it seems to declare things
-
- # XXX what is the destination arg for?
-
- # XXX from reading the spec, "destination" seems less
- # appropriate than "subscription name" (which is what the spec
- # ch. 25 docs say it is)
-
- session.csession.message_subscribe(queue="test",
+ session.psession.message_subscribe(queue=queue.name,
destination=self.name)
- session.csession.message_flow(self.name, 0, 0xFFFFFFFF)
- session.csession.message_flow(self.name, 1, 0xFFFFFFFF)
+ session.psession.message_flow(self.name, 0, 0xFFFFFFFF)
+ session.psession.message_flow(self.name, 1, 0xFFFFFFFF)
- self.client_queue = session.client.queue(self.name)
+ self.client_queue = session.client.pclient.queue(self.name)
def get(self):
- m = Message()
+ m = self.session.message()
m.content = self.client_queue.get(timeout=10).content
return m
class Message(object):
- def __init__(self, body=""):
- self.content = qpid.content.Content(body)
+ def __init__(self, session):
+ self.session = session
+
+ def set(self, payload):
+ self.content = qpid.content.Content(payload)
self.content["content_type"] = "text/plain"
+
+ def send(self, dest, routing_key=None):
+ if dest.__class__ is Queue:
+ self.content["routing_key"] = dest.name
+ self.session.psession.message_transfer(destination="",
+ content=self.content)
+ elif dest.__class__ is Exchange:
+ if routing_key is None:
+ raise Exception("Routing key not set")
- def set_routing_key(self, key):
- self.content["routing_key"] = key
+ self.session.psession.message_transfer(destination=dest.name,
+ content=self.content)
+ else:
+ raise Exception("Unknown destination object")
- def get_routing_key(self):
- try:
- return self.content["routing_key"]
- except KeyError:
- pass
-
def __str__(self):
return self.content.body
+class Client(object):
+ def __init__(self, host, port):
+ self.pclient = qpid.client.Client(host, port)
+
+ def session(self):
+ return Session(self)
+
+ def login(self, user, password):
+ self.pclient.start({"LOGIN": user, "PASSWORD": password})
+
class Session(object):
- def __init__(self, client, csession):
+ def __init__(self, client):
self.client = client
- self.csession = csession
+ self.psession = client.pclient.session()
def open(self):
- self.csession.open()
+ self.psession.open()
def close(self):
- self.csession.close()
+ self.psession.close()
- def declare(self, object):
- if object.__class__ is Queue:
- # XXX blows up without queue=
- self.csession.queue_declare(queue=object.name)
- elif object.__class__ is Exchange:
- self.csession.exchange_declare(exchange=object.name)
- else:
- raise Exception()
+ def exchange(self, name):
+ return Exchange(self, name)
- def bind(self, queue, exchange, binding_key=None):
- if binding_key is None:
- binding_key = queue.name
+ def queue(self, name):
+ return Queue(self, name)
- self.csession.queue_bind(exchange=exchange.name,
- queue=queue.name,
- routing_key=binding_key)
+ def subscribe(self, queue):
+ return Subscription(self, queue)
- def publish(self, message, object):
- if object.__class__ is Exchange:
- self.csession.message_transfer(destination=object.name,
- content=message.content)
- elif object.__class__ is Queue:
- # XXX maybe this shouldn't be conditional
- if message.get_routing_key() is None:
- message.set_routing_key(object.name)
+ def message(self):
+ return Message(self)
- self.csession.message_transfer(destination="",
- content=message.content)
- else:
- raise Exception()
+class TestCommand(object):
+ def __init__(self, name):
+ self.name = name
-def direct_with_explicit_exchange(host, port):
- client = qpid.client.Client(host, port)
- client.start({"LOGIN": "guest", "PASSWORD": "guest"})
+ def run(self, client):
+ session = client.session()
+ session.open()
- session = Session(client, client.session())
- session.open()
+ try:
+ q = session.queue("quirk.test")
- try:
- q = Queue(session, "test")
- e = Exchange(session, "amq.direct")
- s = Subscription(session, "s", q)
+ if not q.exists():
+ q.declare()
- session.declare(q)
- session.bind(q, e)
+ s = session.subscribe(q)
- for i in range(0, 10):
- print i,
+ for i in range(0, 10):
+ print i,
+ m = session.message()
+ m.set("message %i" % i)
+ m.send(q)
+ print "Sent", m
- m = Message("Test message " + str(i))
+ for i in range(0, 10):
+ print i,
+ m = s.get()
+ print "Received", m
+ finally:
+ session.close()
- # XXX make this an arg to publish, instead?
- m.set_routing_key(q.name)
+class BenchCommand(object):
+ def __init__(self, name):
+ self.name = name
- session.publish(m, e)
+ def run(self, client):
+ session = client.session()
+ session.open()
- print "."
+ try:
+ q = session.queue("quirk.bench")
- for i in range(0, 10):
- print i,
+ if not q.exists():
+ q.declare()
- m = s.get()
+ s = session.subscribe(q)
- print m
- finally:
- session.close()
+ i = 0
-def direct_with_implicit_exchange(host, port):
- client = qpid.client.Client(host, port)
- client.start({"LOGIN": "guest", "PASSWORD": "guest"})
+ while True:
+ m = session.message()
+ m.set(str(i))
+ m.send(q)
- # Now, simpler, using the default exchange:
+ if i % 1000 == 0:
+ print ".",
+
+ i += 1
+ finally:
+ session.close()
- session = Session(client, client.session())
- session.open()
+class DemoCommand(object):
+ def __init__(self, name):
+ self.name = name
- try:
- q = Queue(session, "test")
- s = Subscription(session, "s", q)
+ def run(self, client):
+ session = client.session()
+ session.open()
- session.declare(q)
+ qs = list()
+ es = list()
- for i in range(0, 10):
- print i,
- m = Message("m%i" % i)
- session.publish(m, q)
- print "Sent", m
+ try:
+ for i in range(0, 30):
+ name = "demo%02i" % (i + 1)
- for i in range(0, 10):
- print i,
- m = s.get()
- print "Received", m
- finally:
- session.close()
+ q = session.queue(name)
+ q.declare()
+ qs.append(q)
+
+ e = session.exchange(name)
+ e.declare()
+
+ es.append(e)
+ finally:
+ session.close()
+
+def usage():
+ print "Usage: quirk COMMAND [IP:PORT]"
+ sys.exit(2)
+
+commands = dict()
+commands["test"] = TestCommand("test")
+commands["bench"] = BenchCommand("bench")
+commands["demo"] = DemoCommand("demo")
+
if __name__ == "__main__":
- if len(sys.argv) != 2:
- print "Usage: quirk IP:PORT"
- sys.exit(2)
+ if len(sys.argv) < 2:
+ usage();
- addr = sys.argv[1].split(":")
+ command = sys.argv[1]
- if len(addr) > 1:
- host, port = (addr[0], int(addr[1]))
- else:
- host, port = (addr[0], 5672)
+ if command not in commands:
+ print "Unknown command '%s'" % command
+ usage()
+
+ try:
+ addr = sys.argv[2].split(":")
- direct_with_implicit_exchange(host, port)
+ if len(addr) > 1:
+ host, port = addr[0], int(addr[1])
+ else:
+ host, port = addr[0], 5672
+ except IndexError:
+ host, port = "localhost", 5672
+
+ client = Client(host, port)
+ client.login("guest", "guest")
+
+ try:
+ commands[command].run(client)
+ except KeyboardInterrupt:
+ pass
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/broker.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -38,7 +38,7 @@
self.add_parameter(self.submit)
self.add_form_parameter(self.submit)
- self.paginator = self.BrokerPaginator(app, "page")
+ self.paginator = Paginator(app, "page")
self.add_child(self.paginator)
def get_title(self, session, model):
@@ -69,6 +69,8 @@
broker.addBrokerGroup(group)
self.page().set_redirect_url(session, session.marshal())
+ else:
+ self.paginator.set_count(session, BrokerRegistration.select().count())
def render_action_param_name(self, session, broker):
return self.action.path()
@@ -128,10 +130,6 @@
class BrokerSetGroupInput(BrokerGroupInput):
pass
-
- class BrokerPaginator(Paginator):
- def get_object(self, session, model):
- return list(BrokerRegistration.select()) #XXX ugh
class BrokerFrame(CuminFrame):
def __init__(self, app, name):
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/broker.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -2,6 +2,8 @@
<form id="{id}" method="post" action="?">
<!-- <select onchange="document.getElementById('{id}.submit').submit()"> -->
+ <div style="float: right; position: relative; top: -2em;">{page}</div>
+
<div class="sactions">
<h2>Act on Selected Brokers:</h2>
Modified: mgmt/cumin/python/cumin/brokercluster.py
===================================================================
--- mgmt/cumin/python/cumin/brokercluster.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/brokercluster.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -30,8 +30,7 @@
return fmt_olink(branch, cluster)
def render_item_config(self, session, cluster):
- count = len(cluster.brokers)
- return "%i broker%s" % (count, ess(count))
+ return len(cluster.brokers)
def render_item_status(self, session, cluster):
writer = Writer()
Modified: mgmt/cumin/python/cumin/brokercluster.strings
===================================================================
--- mgmt/cumin/python/cumin/brokercluster.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/brokercluster.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -20,7 +20,7 @@
<table class="mobjects">
<tr>
<th>Name</th>
- <th>Configuration</th>
+ <th>Brokers</th>
<th>Status</th>
</tr>
Modified: mgmt/cumin/python/cumin/brokergroup.py
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/brokergroup.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -28,8 +28,7 @@
return fmt_olink(branch, group)
def render_item_config(self, session, group):
- count = len(group.brokers)
- return "%i broker%s" % (count, ess(count))
+ return len(group.brokers)
def render_item_status(self, session, group):
writer = Writer()
Modified: mgmt/cumin/python/cumin/brokergroup.strings
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/brokergroup.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -3,17 +3,10 @@
<li><a class="nav" href="{group_add_href}">Add Broker Group</a></li>
</ul>
-<div class="sactions">
- <h2>Act on Selected Groups:</h2>
- <button>Shutdown</button>
- <button>Remove</button>
-</div>
-
<table class="mobjects">
<tr>
- <th><input type="checkbox"/></th>
<th>Name</th>
- <th>Configuration</th>
+ <th>Brokers</th>
<th>Status</th>
</tr>
@@ -22,7 +15,6 @@
[BrokerGroupSet.item_html]
<tr>
- <td><input type="checkbox"/></td>
<td>{item_link}</td>
<td>{item_config}</td>
<td>{item_status}</td>
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/exchange.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -39,15 +39,22 @@
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
+ self.paginator = Paginator(app, "page")
+ self.add_child(self.paginator)
+
def get_title(self, session, vhost):
return "Exchanges %s" % fmt_count(len(vhost.exchanges))
+ def do_process(self, session, vhost):
+ self.paginator.set_count(session, len(vhost.exchanges))
+
def render_unit_plural(self, session, vhost):
return self.unit.get(session) == "b" and "Bytes" or "Msgs."
def do_get_items(self, session, vhost):
if vhost:
- return sorted_by(vhost.exchanges)
+ start, end = self.paginator.get_bounds(session)
+ return vhost.exchanges[start:end]
def render_item_link(self, session, exchange):
branch = session.branch()
Modified: mgmt/cumin/python/cumin/exchange.strings
===================================================================
--- mgmt/cumin/python/cumin/exchange.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/exchange.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -12,6 +12,7 @@
}
[ExchangeSet.html]
+<div class="rfloat">{page}</div>
{unit}
<table class="mobjects">
Modified: mgmt/cumin/python/cumin/page.strings
===================================================================
--- mgmt/cumin/python/cumin/page.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/page.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -566,6 +566,10 @@
color: #999;
}
+.rfloat {
+ float: right;
+}
+
[CuminPage.javascript]
var cumin;
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/queue.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -21,9 +21,15 @@
self.unit = UnitSwitch(app, "unit")
self.add_child(self.unit)
+ self.paginator = Paginator(app, "page")
+ self.add_child(self.paginator)
+
def get_title(self, session, vhost):
return "Queues (%s)" % len(vhost.queues)
+ def do_process(self, session, vhost):
+ self.paginator.set_count(session, len(vhost.queues))
+
def render_unit_singular(self, session, vhost):
return self.unit.get(session) == "b" and "Byte" or "Msg."
@@ -32,7 +38,8 @@
def do_get_items(self, session, vhost):
if vhost:
- return sorted_by(vhost.queues)
+ start, end = self.paginator.get_bounds(session)
+ return vhost.queues[start:end]
def render_item_link(self, session, queue):
branch = session.branch()
@@ -43,8 +50,6 @@
return queue.name
def render_item_consumers(self, session, queue):
- return None # XXX
-
branch = session.branch()
frame = self.page().show_queue(branch, queue)
frame.show_view(branch).show_consumers(branch)
@@ -478,10 +483,10 @@
class QueueConsumerSet(ItemSet):
def get_title(self, session, queue):
- return "Consumers" #XXX %s" % fmt_count(len(queue.consumers))
+ return "Consumers %s" % fmt_count(len(queue.consumers))
def do_get_items(self, session, queue):
- return list() #XXX sorted_by(queue.consumers)
+ return queue.consumers
def render_item_name(self, session, consumer):
return consumer.name
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/queue.strings 2007-12-10 01:46:31 UTC (rev 1445)
@@ -7,6 +7,7 @@
[QueueSet.html]
<form action="{href}" method="get">
+ <div class="rfloat">{page}</div>
{unit}
<div class="sactions">
Modified: mgmt/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/cumin/python/cumin/widgets.py 2007-12-10 01:43:59 UTC (rev 1444)
+++ mgmt/cumin/python/cumin/widgets.py 2007-12-10 01:46:31 UTC (rev 1445)
@@ -273,8 +273,11 @@
self.param.set_default(0)
self.add_parameter(self.param)
- self.page_size = 2
+ self.count = Attribute(app, "count")
+ self.add_attribute(self.count)
+ self.page_size = 15
+
def get(self, session):
return self.param.get(session)
@@ -285,8 +288,12 @@
page = self.get(session)
return (self.page_size * page, self.page_size * (page + 1))
+ def set_count(self, session, count):
+ self.count.set(session, count)
+
def do_get_items(self, session, object):
- return range(0, int(ceil(len(object) / float(self.page_size))))
+ count = self.count.get(session)
+ return range(0, int(ceil(count / float(self.page_size))))
def render_item_class_attr(self, session, page):
if self.get(session) == page:
18 years, 4 months
rhmessaging commits: r1444 - mgmt/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-09 20:43:59 -0500 (Sun, 09 Dec 2007)
New Revision: 1444
Modified:
mgmt/cumin/bin/cumin-test
Log:
Fixes a bug in profiling.
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2007-12-07 19:17:58 UTC (rev 1443)
+++ mgmt/cumin/bin/cumin-test 2007-12-10 01:43:59 UTC (rev 1444)
@@ -90,14 +90,14 @@
prof = Profile()
try:
- statement = "do_main(%i, %i, %r, %r)" % \
+ statement = "do_main(%i, %r, %r)" % \
(in_port, in_bench, in_debug)
prof.run(statement)
raise KeyboardInterrupt()
except KeyboardInterrupt:
- file = "/tmp/cumin-test-stats-%i" % time()
+ file = "/tmp/cumin-test-stats"
prof.dump_stats(file)
18 years, 4 months
rhmessaging commits: r1443 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-07 14:17:58 -0500 (Fri, 07 Dec 2007)
New Revision: 1443
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/file_hdr.cpp
store/trunk/cpp/lib/jrnl/file_hdr.hpp
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jdir.hpp
store/trunk/cpp/lib/jrnl/jexception.cpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/jrec.cpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/lfh.cpp
store/trunk/cpp/lib/jrnl/lfh.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp
store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
Log:
Tidyup: Removed all explicit throw (jexception) declarations on functions that throw.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -26,6 +26,7 @@
#include <qpid/broker/Message.h>
#include <qpid/framing/Buffer.h>
#include <algorithm>
+#include <iomanip>
#include <sstream>
#include "BindingDbt.h"
#include "IdPairDbt.h"
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -24,6 +24,7 @@
#include "JournalImpl.h"
#include "jrnl/jerrno.hpp"
+#include "jrnl/jexception.hpp"
#include <qpid/sys/Monitor.h>
using namespace rhm::bdbstore;
@@ -77,7 +78,7 @@
JournalImpl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
- u_int64_t queue_id) throw (jexception)
+ u_int64_t queue_id)
{
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -113,7 +114,6 @@
#define AIO_SLEEP_TIME 1000000
const bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length)
- throw (journal::jexception)
{
if (_dtok.rid() != rid)
{
@@ -171,7 +171,6 @@
const iores
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
{
//std::cout << " " << _jid << ":E" << std::flush;
return handleInactivityTimer(jcntl::enqueue_data_record(data_buff, tot_data_len,
@@ -180,7 +179,7 @@
const iores
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
- const bool transient) throw (jexception)
+ const bool transient)
{
//std::cout << " " << _jid << ":E-ext" << std::flush;
return handleInactivityTimer(jcntl::enqueue_extern_data_record(tot_data_len, dtokp,
@@ -190,7 +189,7 @@
const iores
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const bool transient)
{
//std::cout << " " << _jid << ":E-tx" << std::flush;
return handleInactivityTimer(jcntl::enqueue_txn_data_record(data_buff, tot_data_len,
@@ -199,7 +198,7 @@
const iores
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient)
{
//std::cout << " " << _jid << ":E-tx-ext" << std::flush;
return handleInactivityTimer(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
@@ -207,7 +206,7 @@
}
const iores
-JournalImpl::dequeue_data_record(data_tok* const dtokp) throw (jexception)
+JournalImpl::dequeue_data_record(data_tok* const dtokp)
{
//std::cout << " " << _jid << ":D" << std::flush;
return handleInactivityTimer(jcntl::dequeue_data_record(dtokp));
@@ -215,35 +214,34 @@
const iores
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
- throw (jexception)
{
//std::cout << " " << _jid << ":D-tx" << std::flush;
return handleInactivityTimer(jcntl::dequeue_txn_data_record(dtokp, xid));
}
const iores
-JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) throw (jexception)
+JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
//std::cout << " " << _jid << ":A-tx" << std::flush;
return handleInactivityTimer(jcntl::txn_abort(dtokp, xid));
}
const iores
-JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) throw (jexception)
+JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
//std::cout << " " << _jid << ":C-tx" << std::flush;
return handleInactivityTimer(jcntl::txn_commit(dtokp, xid));
}
void
-JournalImpl::stop(bool block_till_aio_cmpl) throw (jexception)
+JournalImpl::stop(bool block_till_aio_cmpl)
{
(dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
jcntl::stop(block_till_aio_cmpl);
}
void
-JournalImpl::flush() throw (jexception)
+JournalImpl::flush()
{
jcntl::flush();
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
@@ -291,7 +289,7 @@
}
const iores
-JournalImpl::handleInactivityTimer(const iores r) throw (jexception)
+JournalImpl::handleInactivityTimer(const iores r)
{
writeActivityFlag = true;
return r;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-07 19:17:58 UTC (rev 1443)
@@ -94,10 +94,10 @@
void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id) throw (journal::jexception);
+ u_int64_t& highest_rid, u_int64_t queue_id);
void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id) throw (journal::jexception)
+ u_int64_t& highest_rid, u_int64_t queue_id)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
@@ -107,50 +107,43 @@
// in chunks. To be replaced when coding to do this direct from the journal is ready.
// Returns true if the record is extern, false if local.
const bool loadMsgContent(u_int64_t rid, std::string& data, size_t offset,
- size_t length) throw (journal::jexception);
+ size_t length);
// Overrides for write inactivity timer
const journal::iores enqueue_data_record(const void* const data_buff,
const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
- const bool transient = false) throw (journal::jexception);
+ const bool transient = false);
const journal::iores enqueue_extern_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const bool transient = false)
- throw (journal::jexception);
+ journal::data_tok* dtokp, const bool transient = false);
const journal::iores enqueue_txn_data_record(const void* const data_buff,
const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
- const std::string& xid, const bool transient = false)
- throw (journal::jexception);
+ const std::string& xid, const bool transient = false);
const journal::iores enqueue_extern_txn_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const std::string& xid, const bool transient = false)
- throw (journal::jexception);
+ journal::data_tok* dtokp, const std::string& xid, const bool transient = false);
- const journal::iores dequeue_data_record(journal::data_tok* const dtokp)
- throw (journal::jexception);
+ const journal::iores dequeue_data_record(journal::data_tok* const dtokp);
const journal::iores dequeue_txn_data_record(journal::data_tok* const dtokp,
- const std::string& xid) throw (journal::jexception);
+ const std::string& xid);
- const journal::iores txn_abort(journal::data_tok* const dtokp, const std::string& xid)
- throw (journal::jexception);
+ const journal::iores txn_abort(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores txn_commit(journal::data_tok* const dtokp, const std::string& xid)
- throw (journal::jexception);
+ const journal::iores txn_commit(journal::data_tok* const dtokp, const std::string& xid);
- void stop(bool block_till_aio_cmpl = false) throw (journal::jexception);
+ void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
- void flush() throw (journal::jexception);
+ void flush();
// TimerTask callback
void getEventsFire();
void flushFire();
private:
- const journal::iores handleInactivityTimer(const journal::iores r)
- throw (journal::jexception);
+ const journal::iores handleInactivityTimer(const journal::iores r);
}; // class JournalImpl
} // namespace bdbstore
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-12-07 19:17:58 UTC (rev 1443)
@@ -34,6 +34,7 @@
#include "DataTokenImpl.h"
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <jrnl/jexception.hpp>
namespace rhm{
namespace bdbstore{
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -33,8 +33,8 @@
#include <jrnl/data_tok.hpp>
#include <sstream>
-#include <iostream>
#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
namespace rhm
{
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -54,7 +54,6 @@
#include <qpid/broker/PersistableMessage.h>
#include <qpid/RefCounted.h>
#include <sys/types.h>
-#include <jrnl/jexception.hpp>
namespace rhm
{
@@ -156,7 +155,7 @@
inline void set_fid(const u_int16_t fid) { _fid = fid; }
inline const u_int64_t rid() const { return _rid; }
inline void set_rid(const u_int64_t rid) { _rid = rid; }
- inline const u_int64_t dequeue_rid() const throw (jexception) {return _dequeue_rid; }
+ inline const u_int64_t dequeue_rid() const {return _dequeue_rid; }
inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
inline const bool has_xid() const { return !_xid.empty(); }
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -37,6 +37,7 @@
#include <iomanip>
#include <sstream>
#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
namespace rhm
{
@@ -88,7 +89,7 @@
}
const u_int32_t
-deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
+deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
{
assert(wptr != NULL);
assert(max_size_dblks > 0);
@@ -201,7 +202,6 @@
const u_int32_t
deq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception)
{
assert(rptr != NULL);
assert(max_size_dblks > 0);
@@ -317,7 +317,7 @@
}
const bool
-deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
{
if (rec_offs == 0)
{
@@ -410,7 +410,7 @@
}
void
-deq_rec::chk_hdr() const throw (jexception)
+deq_rec::chk_hdr() const
{
jrec::chk_hdr(_deq_hdr._hdr);
if (_deq_hdr._hdr._magic != RHM_JDAT_DEQ_MAGIC)
@@ -425,14 +425,14 @@
}
void
-deq_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+deq_rec::chk_hdr(u_int64_t rid) const
{
chk_hdr();
jrec::chk_rid(_deq_hdr._hdr, rid);
}
void
-deq_rec::chk_tail() const throw (jexception)
+deq_rec::chk_tail() const
{
jrec::chk_tail(_deq_tail, _deq_hdr._hdr);
}
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -73,12 +73,11 @@
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
const size_t xidlen, const bool owi);
- const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception);
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
- u_int32_t max_size_dblks) throw (jexception);
+ u_int32_t max_size_dblks);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs);
inline const u_int64_t rid() const { return _deq_hdr._hdr._rid; }
inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
@@ -89,9 +88,9 @@
const size_t rec_size() const;
private:
- virtual void chk_hdr() const throw (jexception);
- virtual void chk_hdr(u_int64_t rid) const throw (jexception);
- virtual void chk_tail() const throw (jexception);
+ virtual void chk_hdr() const;
+ virtual void chk_hdr(u_int64_t rid) const;
+ virtual void chk_tail() const;
virtual void clean();
}; // class deq_rec
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -53,13 +53,13 @@
}
void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception)
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid)
{
insert_fid(rid, fid, false);
}
void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked) throw (jexception)
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
{
fid_lock_pair rec(fid, locked);
pthread_mutex_lock(&_mutex);
@@ -74,7 +74,7 @@
}
const u_int16_t
-enq_map::get_fid(const u_int64_t rid) throw (jexception)
+enq_map::get_fid(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -95,7 +95,7 @@
}
const u_int16_t
-enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag) throw (jexception)
+enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -133,7 +133,7 @@
}
void
-enq_map::lock(const u_int64_t rid) throw (jexception)
+enq_map::lock(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -149,7 +149,7 @@
}
void
-enq_map::unlock(const u_int64_t rid) throw (jexception)
+enq_map::unlock(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -165,7 +165,7 @@
}
const bool
-enq_map::is_locked(const u_int64_t rid) throw (jexception)
+enq_map::is_locked(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -41,10 +41,10 @@
}
}
+#include <jrnl/jexception.hpp>
#include <map>
-#include <vector>
#include <pthread.h>
-#include <jrnl/jexception.hpp>
+#include <vector>
namespace rhm
{
@@ -71,16 +71,14 @@
enq_map();
virtual ~enq_map();
- void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
- void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
- throw (jexception);
- const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
- const u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false)
- throw (jexception);
+ void insert_fid(const u_int64_t rid, const u_int16_t fid);
+ void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
+ const u_int16_t get_fid(const u_int64_t rid);
+ const u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
const bool is_enqueued(const u_int64_t rid);
- void lock(const u_int64_t rid) throw (jexception);
- void unlock(const u_int64_t rid) throw (jexception);
- const bool is_locked(const u_int64_t rid) throw (jexception);
+ void lock(const u_int64_t rid);
+ void unlock(const u_int64_t rid);
+ const bool is_locked(const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -35,8 +35,9 @@
#include <assert.h>
#include <errno.h>
#include <iomanip>
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
#include <sstream>
-#include <jrnl/jerrno.hpp>
namespace rhm
{
@@ -105,7 +106,7 @@
}
const u_int32_t
-enq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
+enq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
{
assert(wptr != NULL);
assert(max_size_dblks > 0);
@@ -251,7 +252,6 @@
const u_int32_t
enq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception)
{
assert(rptr != NULL);
assert(max_size_dblks > 0);
@@ -432,7 +432,7 @@
}
const bool
-enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
{
if (rec_offs == 0)
{
@@ -578,7 +578,7 @@
}
void
-enq_rec::chk_hdr() const throw (jexception)
+enq_rec::chk_hdr() const
{
jrec::chk_hdr(_enq_hdr._hdr);
if (_enq_hdr._hdr._magic != RHM_JDAT_ENQ_MAGIC)
@@ -593,14 +593,14 @@
}
void
-enq_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+enq_rec::chk_hdr(u_int64_t rid) const
{
chk_hdr();
jrec::chk_rid(_enq_hdr._hdr, rid);
}
void
-enq_rec::chk_tail() const throw (jexception)
+enq_rec::chk_tail() const
{
jrec::chk_tail(_enq_tail, _enq_hdr._hdr);
}
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -85,12 +85,11 @@
const void* const xidp, const size_t xidlen, const bool owi, const bool transient,
const bool external);
- const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception);
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
- u_int32_t max_size_dblks) throw (jexception);
+ u_int32_t max_size_dblks);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs);
const size_t get_xid(void** const xidpp);
const size_t get_data(void** const datapp);
@@ -104,9 +103,9 @@
void set_rid(const u_int64_t rid);
private:
- void chk_hdr() const throw (jexception);
- void chk_hdr(u_int64_t rid) const throw (jexception);
- void chk_tail() const throw (jexception);
+ void chk_hdr() const;
+ void chk_hdr(u_int64_t rid) const;
+ void chk_tail() const;
virtual void clean();
}; // class enq_rec
Modified: store/trunk/cpp/lib/jrnl/file_hdr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/file_hdr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/file_hdr.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -144,8 +144,7 @@
{}
file_hdr::file_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
- const u_int32_t fid, const size_t fro, const bool owi, const bool settime)
- throw (jexception):
+ const u_int32_t fid, const size_t fro, const bool owi, const bool settime):
_hdr(magic, version, rid, owi),
_fid(fid),
_res(0),
@@ -185,7 +184,7 @@
}
void
-file_hdr::set_time() throw (jexception)
+file_hdr::set_time()
{
// TODO: Standardize on a method for getting time that does not requrie a context switch.
timespec ts;
Modified: store/trunk/cpp/lib/jrnl/file_hdr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/file_hdr.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/file_hdr.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -43,6 +43,8 @@
}
#include <jrnl/jcfg.hpp>
+
+#include <sys/types.h>
#include <jrnl/jexception.hpp>
namespace rhm
@@ -238,8 +240,7 @@
* \brief Convenience constructor which initializes values during construction.
*/
file_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
- const u_int32_t fid, const size_t fro, const bool owi, const bool settime = false)
- throw (jexception);
+ const u_int32_t fid, const size_t fro, const bool owi, const bool settime = false);
inline const bool get_owi() const { return _hdr._uflag & hdr::HDR_OVERWRITE_INDICATOR_MASK; }
void set_owi(const bool owi);
@@ -247,7 +248,7 @@
/**
* \brief Gets the current time from the system clock and sets the timestamp in the struct.
*/
- void set_time() throw (jexception);
+ void set_time();
/**
* \brief Sets the timestamp in the struct to the provided value (in seconds and
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -33,8 +33,8 @@
#ifndef rhm_journal_jcfg_hpp
#define rhm_journal_jcfg_hpp
-#include <iomanip> // for debug only
-#include <iostream> // for debug only
+// #include <iomanip> // for debug only
+// #include <iostream> // for debug only
#if defined(__i386__) /* little endian, 32 bits */
#define JRNL_LITTLE_ENDIAN
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -90,7 +90,7 @@
void
jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb)
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
@@ -133,7 +133,6 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
@@ -182,7 +181,7 @@
}
void
-jcntl::recover_complete() throw (jexception)
+jcntl::recover_complete()
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
@@ -196,7 +195,7 @@
}
void
-jcntl::delete_jrnl_files() throw (jexception)
+jcntl::delete_jrnl_files()
{
stop(true); // wait for AIO to complete
_jdir.delete_dir();
@@ -206,7 +205,6 @@
const iores
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
{
iores res;
check_wstatus("enqueue_data_record");
@@ -223,7 +221,6 @@
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_data_record");
@@ -240,7 +237,7 @@
const iores
jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const bool transient)
{
iores res;
check_wstatus("enqueue_tx_data_record");
@@ -257,7 +254,7 @@
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient)
{
iores res;
check_wstatus("enqueue_extern_txn_data_record");
@@ -273,14 +270,14 @@
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception)
+ const void** const data, bool auto_discard)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
}
const iores
-jcntl::discard_data_record(data_tok* const dtokp) throw (jexception)
+jcntl::discard_data_record(data_tok* const dtokp)
{
check_rstatus("discard_data_record");
return _rmgr.discard(dtokp);
@@ -288,14 +285,14 @@
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+ bool& transient, bool& external, data_tok* const dtokp)
{
check_rstatus("read_data");
return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
}
const iores
-jcntl::dequeue_data_record(data_tok* const dtokp) throw (jexception)
+jcntl::dequeue_data_record(data_tok* const dtokp)
{
iores res;
check_wstatus("dequeue_data");
@@ -307,7 +304,7 @@
}
const iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid) throw (jexception)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
iores res;
check_wstatus("dequeue_data");
@@ -319,7 +316,7 @@
}
const iores
-jcntl::txn_abort(data_tok* const dtokp, const std::string& xid) throw (jexception)
+jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
iores res;
check_wstatus("txn_abort");
@@ -331,7 +328,7 @@
}
const iores
-jcntl::txn_commit(data_tok* const dtokp, const std::string& xid) throw (jexception)
+jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
iores res;
check_wstatus("txn_commit");
@@ -343,13 +340,13 @@
}
const bool
-jcntl::is_txn_synced(const std::string& xid) throw (jexception)
+jcntl::is_txn_synced(const std::string& xid)
{
return _wmgr.is_txn_synced(xid);
}
const u_int32_t
-jcntl::get_wr_events() throw (jexception)
+jcntl::get_wr_events()
{
u_int32_t res;
int ret = pthread_mutex_trylock(&_mutex);
@@ -370,13 +367,13 @@
}
const u_int32_t
-jcntl::get_rd_events() throw (jexception)
+jcntl::get_rd_events()
{
return _rmgr.get_events();
}
void
-jcntl::stop(bool block_till_aio_cmpl) throw (jexception)
+jcntl::stop(bool block_till_aio_cmpl)
{
if (_readonly_flag)
check_rstatus("stop");
@@ -392,7 +389,7 @@
}
void
-jcntl::flush() throw (jexception)
+jcntl::flush()
{
if (!_init_flag)
return;
@@ -407,7 +404,7 @@
// Private functions
void
-jcntl::check_wstatus(const char* fn_name) const throw (jexception)
+jcntl::check_wstatus(const char* fn_name) const
{
if (!_init_flag)
throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
@@ -418,7 +415,7 @@
}
void
-jcntl::check_rstatus(const char* fn_name) const throw (jexception)
+jcntl::check_rstatus(const char* fn_name) const
{
if (!_init_flag)
throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
@@ -427,7 +424,7 @@
}
void
-jcntl::write_infofile() const throw (jexception)
+jcntl::write_infofile() const
{
timespec ts;
if (::clock_gettime(CLOCK_REALTIME, &ts))
@@ -444,7 +441,7 @@
#define MAX_AIO_CMPL_SLEEPS 1000 // Total: 10 sec
void
-jcntl::aio_cmpl_wait() throw (jexception)
+jcntl::aio_cmpl_wait()
{
u_int32_t cnt = 0;
while (_wmgr.get_aio_evt_rem())
@@ -457,7 +454,7 @@
}
void
-jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list) throw (jexception)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
@@ -515,7 +512,7 @@
}
const bool
-jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
{
size_t cum_size_read = 0;
void* xidp = NULL;
@@ -732,7 +729,6 @@
const bool
jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
- throw (jexception)
{
if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
{
@@ -760,7 +756,6 @@
void
jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
- throw (jexception)
{
unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
if (sblk_offs)
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -190,7 +190,7 @@
* \exception TODO
*/
void initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception);
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb);
/**
* \brief Initialize using internal default callbacks and data_tok lists.
@@ -199,7 +199,7 @@
*
* \exception TODO
*/
- void initialize() throw (jexception)
+ void initialize()
{
initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback );
@@ -232,7 +232,7 @@
*/
void recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid) throw (jexception);
+ const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
/**
* \brief Recover using internal default callbacks and data_tok lists.
@@ -242,7 +242,6 @@
* \exception TODO
*/
void recover(const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_txn_list, highest_rid);
@@ -259,7 +258,7 @@
*
* \exception TODO
*/
- void recover_complete() throw (jexception);
+ void recover_complete();
/**
* \brief Stops journal and deletes all journal files.
@@ -268,7 +267,7 @@
*
* \exception TODO
*/
- void delete_jrnl_files() throw (jexception);
+ void delete_jrnl_files();
/**
* \brief Enqueue data.
@@ -308,10 +307,10 @@
* \exception TODO
*/
const iores enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient = false)
- throw (jexception);
+ const size_t this_data_len, data_tok* dtokp, const bool transient = false);
+
const iores enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
- const bool transient = false) throw (jexception);
+ const bool transient = false);
/**
* \brief Enqueue data.
@@ -328,9 +327,9 @@
*/
const iores enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient = false) throw (jexception);
+ const bool transient = false);
const iores enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient = false) throw (jexception);
+ const std::string& xid, const bool transient = false);
/**
* \brief Retrieve details of next record to be read without consuming the record.
@@ -391,8 +390,7 @@
*/
// *** NOT YET IMPLEMENTED ***
const iores get_data_record(const u_int64_t& rid, const size_t& dsize,
- const size_t& dsize_avail, const void** const data, bool auto_discard = false)
- throw (jexception);
+ const size_t& dsize_avail, const void** const data, bool auto_discard = false);
/**
* \brief Discard (skip) next record to be read without reading or retrieving it.
@@ -400,7 +398,7 @@
* \exception TODO
*/
// *** NOT YET IMPLEMENTED ***
- const iores discard_data_record(data_tok* const dtokp) throw (jexception);
+ const iores discard_data_record(data_tok* const dtokp);
/**
* \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -436,8 +434,7 @@
* \exception TODO
*/
const iores read_data_record(void** const datapp, size_t& dsize, void** const xidpp,
- size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp)
- throw (jexception);
+ size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -453,7 +450,7 @@
*
* \exception TODO
*/
- const iores dequeue_data_record(data_tok* const dtokp) throw (jexception);
+ const iores dequeue_data_record(data_tok* const dtokp);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -471,8 +468,7 @@
*
* \exception TODO
*/
- const iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
- throw (jexception);
+ const iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -487,7 +483,7 @@
*
* \exception TODO
*/
- const iores txn_abort(data_tok* const dtokp, const std::string& xid) throw (jexception);
+ const iores txn_abort(data_tok* const dtokp, const std::string& xid);
/**
* \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
@@ -502,7 +498,7 @@
*
* \exception TODO
*/
- const iores txn_commit(data_tok* const dtokp, const std::string& xid) throw (jexception);
+ const iores txn_commit(data_tok* const dtokp, const std::string& xid);
/**
* \brief Check whether all the enqueue records for the given xid have reached disk.
@@ -511,7 +507,7 @@
*
* \exception TODO
*/
- const bool is_txn_synced(const std::string& xid) throw (jexception);
+ const bool is_txn_synced(const std::string& xid);
/**
* \brief Forces a check for returned AIO write events.
@@ -520,7 +516,7 @@
* dequeue() operations, but if these operations cease, then this call needs to be made to
* force the processing of any outstanding AIO operations.
*/
- const u_int32_t get_wr_events() throw (jexception);
+ const u_int32_t get_wr_events();
/**
* \brief Forces a check for returned AIO read events.
@@ -529,7 +525,7 @@
* operations, but if these operations cease, then this call needs to be made to force the
* processing of any outstanding AIO operations.
*/
- const u_int32_t get_rd_events() throw (jexception);
+ const u_int32_t get_rd_events();
/**
* \brief Stop the journal from accepting any further requests to read or write data.
@@ -544,12 +540,12 @@
* \param block_till_aio_cmpl If true, will block the thread while waiting for all
* outstanding AIO operations to complete.
*/
- void stop(bool block_till_aio_cmpl = false) throw (jexception);
+ void stop(bool block_till_aio_cmpl = false);
/**
* \brief Force a flush of the write page cache, creating a single AIO write operation.
*/
- void flush() throw (jexception);
+ void flush();
inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
inline const u_int32_t get_wr_outstanding_aio_dblks() const
@@ -621,31 +617,29 @@
/**
* \brief Check status of journal before allowing write operations.
*/
- void check_wstatus(const char* fn_name) const throw (jexception);
+ void check_wstatus(const char* fn_name) const;
/**
* \brief Check status of journal before allowing read operations.
*/
- void check_rstatus(const char* fn_name) const throw (jexception);
+ void check_rstatus(const char* fn_name) const;
/**
* \brief Write info file <basefilename>.jinf to disk
*/
- void write_infofile() const throw (jexception);
+ void write_infofile() const;
/**
* \brief Call that blocks while waiting for all outstanding AIOs to complete
*/
- void aio_cmpl_wait() throw (jexception);
+ void aio_cmpl_wait();
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
- throw (jexception);
+ void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
- const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
- throw (jexception);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd);
const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
hdr& h, rcvdat& rd, std::streampos& rec_offset);
@@ -653,32 +647,19 @@
const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const bool jump_fro);
- const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
- throw (jexception);
+ const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos);
- void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
- throw (jexception);
+ void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
+ /**
+ * Intenal callback write
+ */
+ static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
- /**
- * \brief Analyze a particular journal file for recovery.
- *
- * \return <b><i>true</i></b> if end of journal (eoj) found; <b><i>false</i></b> otherwise.
- */
-// const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
-// const std::vector<std::string>& prep_txn_list) throw (jexception);
-
- /**
- * Intenal callback write
- */
- static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
-
- /**
- * Intenal callback write
- */
- static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
-
-
+ /**
+ * Intenal callback write
+ */
+ static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -35,13 +35,13 @@
#include <dirent.h>
#include <cerrno>
-//#include <fstream>
#include <iomanip>
#include <sstream>
#include <sys/stat.h>
#include <jrnl/jcfg.hpp>
#include <jrnl/jerrno.hpp>
-#include <iostream>// debug only
+#include <jrnl/jexception.hpp>
+
namespace rhm
{
namespace journal
@@ -58,21 +58,21 @@
// === create_dir ===
void
-jdir::create_dir() throw (jexception)
+jdir::create_dir()
{
create_dir(_dirname);
}
void
-jdir::create_dir(const char* dirname) throw (jexception)
+jdir::create_dir(const char* dirname)
{
create_dir(std::string(dirname));
}
void
-jdir::create_dir(const std::string& dirname) throw (jexception)
+jdir::create_dir(const std::string& dirname)
{
size_t fdp = dirname.find_last_of('/');
if (fdp != std::string::npos)
@@ -96,14 +96,13 @@
// === clear_dir ===
void
-jdir::clear_dir(const bool create_flag) throw (jexception)
+jdir::clear_dir(const bool create_flag)
{
clear_dir(_dirname, _base_filename, create_flag);
}
void
jdir::clear_dir(const char* dirname, const char* base_filename, const bool create_flag)
- throw (jexception)
{
clear_dir(std::string(dirname), std::string(base_filename), create_flag);
}
@@ -114,7 +113,7 @@
#ifndef RHM_JOWRITE
base_filename
#endif
- , const bool create_flag) throw (jexception)
+ , const bool create_flag)
{
DIR* dir = ::opendir(dirname.c_str());
if (!dir)
@@ -184,20 +183,20 @@
// === verify_dir ===
void
-jdir::verify_dir() throw (jexception)
+jdir::verify_dir()
{
verify_dir(_dirname, _base_filename);
}
void
-jdir::verify_dir(const char* dirname, const char* base_filename) throw (jexception)
+jdir::verify_dir(const char* dirname, const char* base_filename)
{
verify_dir(std::string(dirname), std::string(base_filename));
}
void
-jdir::verify_dir(const std::string& dirname, const std::string& base_filename) throw (jexception)
+jdir::verify_dir(const std::string& dirname, const std::string& base_filename)
{
if (!is_dir(dirname))
{
@@ -223,19 +222,19 @@
// === delete_dir ===
void
-jdir::delete_dir(bool children_only) throw (jexception)
+jdir::delete_dir(bool children_only)
{
delete_dir(_dirname, children_only);
}
void
-jdir::delete_dir(const char* dirname, bool children_only) throw (jexception)
+jdir::delete_dir(const char* dirname, bool children_only)
{
delete_dir(std::string(dirname), children_only);
}
void
-jdir::delete_dir(const std::string& dirname, bool children_only) throw (jexception)
+jdir::delete_dir(const std::string& dirname, bool children_only)
{
struct dirent* entry;
struct stat s;
@@ -320,7 +319,6 @@
std::string
jdir::create_bak_dir(const std::string& dirname, const std::string& base_filename)
- throw (jexception)
{
DIR* dir = ::opendir(dirname.c_str());
long dir_num = 0L;
@@ -378,7 +376,7 @@
}
const bool
-jdir::is_dir(const char* name) throw (jexception)
+jdir::is_dir(const char* name)
{
struct stat s;
if (::stat(name, &s))
@@ -391,13 +389,13 @@
}
const bool
-jdir::is_dir(const std::string& name) throw (jexception)
+jdir::is_dir(const std::string& name)
{
return is_dir(name.c_str());
}
const bool
-jdir::exists(const char* name) throw (jexception)
+jdir::exists(const char* name)
{
struct stat s;
if (::stat(name, &s))
@@ -413,7 +411,7 @@
}
const bool
-jdir::exists(const std::string& name) throw (jexception)
+jdir::exists(const std::string& name)
{
return exists(name.c_str());
}
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -42,7 +42,6 @@
}
}
-#include <jrnl/jexception.hpp>
#include <jrnl/jinf.hpp>
namespace rhm
@@ -80,7 +79,7 @@
*
* \exception ??
*/
- void create_dir() throw (jexception);
+ void create_dir();
/**
* \brief Static function to create a directory. Recursive creation is supported.
@@ -89,7 +88,7 @@
*
* \exception ??
*/
- static void create_dir(const char* dirname) throw (jexception);
+ static void create_dir(const char* dirname);
/**
* \brief Static function to create a directory. Recursive creation is supported.
@@ -98,7 +97,7 @@
*
* \exception ??
*/
- static void create_dir(const std::string& dirname) throw (jexception);
+ static void create_dir(const std::string& dirname);
/**
@@ -114,7 +113,7 @@
* directory failed.
* \exception The directory handle could not be closed.
*/
- void clear_dir(const bool create_flag = true) throw (jexception);
+ void clear_dir(const bool create_flag = true);
/**
* \brief Clear the directory dirname of journal files matching base_filename
@@ -132,7 +131,7 @@
* \exception The directory handle could not be closed.
*/
static void clear_dir(const char* dirname, const char* base_filename,
- const bool create_flag = true) throw (jexception);
+ const bool create_flag = true);
/**
* \brief Clear the directory dirname of journal files matching base_filename
@@ -150,7 +149,7 @@
* \exception The directory handle could not be closed.
*/
static void clear_dir(const std::string& dirname, const std::string& base_filename,
- const bool create_flag = true) throw (jexception);
+ const bool create_flag = true);
/**
@@ -158,7 +157,7 @@
*
* \exception ??
*/
- void verify_dir() throw (jexception);
+ void verify_dir();
/**
* \brief ??
@@ -169,7 +168,7 @@
*
* \exception ??
*/
- static void verify_dir(const char* dirname, const char* base_filename) throw (jexception);
+ static void verify_dir(const char* dirname, const char* base_filename);
/**
* \brief ??
@@ -180,8 +179,7 @@
*
* \exception ??
*/
- static void verify_dir(const std::string& dirname, const std::string& base_filename)
- throw (jexception);
+ static void verify_dir(const std::string& dirname, const std::string& base_filename);
/**
* \brief Delete the journal directory and all files and sub--directories that it may
@@ -196,7 +194,7 @@
* directory failed.
* \exception The directory handle could not be closed.
*/
- void delete_dir(bool children_only = false ) throw (jexception);
+ void delete_dir(bool children_only = false );
/**
* \brief Delete the journal directory and all files and sub--directories that it may
@@ -212,7 +210,7 @@
* directory failed.
* \exception The directory handle could not be closed.
*/
- static void delete_dir(const char* dirname, bool children_only = false) throw (jexception);
+ static void delete_dir(const char* dirname, bool children_only = false);
/**
* \brief Delete the journal directory and all files and sub--directories that it may
@@ -228,8 +226,7 @@
* directory failed.
* \exception The directory handle could not be closed.
*/
- static void delete_dir(const std::string& dirname, bool children_only = false)
- throw (jexception);
+ static void delete_dir(const std::string& dirname, bool children_only = false);
/**
* \brief Create bakup directory that is next in sequence and move all journal files
@@ -247,7 +244,7 @@
* \exception ??
*/
static std::string create_bak_dir(const std::string& dirname,
- const std::string& base_filename) throw (jexception);
+ const std::string& base_filename);
/**
* \brief Return the directory name as a string.
@@ -267,7 +264,7 @@
* otherwise.
* \exception The stat() operation failed on the named file.
*/
- const static bool is_dir(const char* name) throw (jexception);
+ const static bool is_dir(const char* name);
/**
* \brief Test whether the named file is a directory.
@@ -277,7 +274,7 @@
* otherwise.
* \exception The stat() operation failed on the named file.
*/
- const static bool is_dir(const std::string& name) throw (jexception);
+ const static bool is_dir(const std::string& name);
/**
@@ -292,7 +289,7 @@
* otherwise.
* \exception The stat() operation failed on the named entity.
*/
- const static bool exists(const char* name) throw (jexception);
+ const static bool exists(const char* name);
/**
* \brief Test whether the named entity exists on the filesystem.
@@ -306,7 +303,7 @@
* otherwise.
* \exception The stat() operation failed on the named entity.
*/
- const static bool exists(const std::string& name) throw (jexception);
+ const static bool exists(const std::string& name);
public:
Modified: store/trunk/cpp/lib/jrnl/jexception.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jexception.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -33,7 +33,6 @@
#include <jrnl/jexception.hpp>
#include <iomanip>
-#include <iostream> // debug only
#include <sstream>
#include <jrnl/jerrno.hpp>
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -43,7 +43,7 @@
namespace journal
{
-jinf::jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception):
+jinf::jinf(const std::string& jinf_filename, bool validate_flag):
_jver(0),
_num_jfiles(0),
_jfsize_sblks(0),
@@ -90,7 +90,7 @@
{}
void
-jinf::validate() throw (jexception)
+jinf::validate()
{
bool err = false;
std::stringstream ss;
@@ -154,7 +154,7 @@
}
const u_int16_t
-jinf::analyze() throw (jexception)
+jinf::analyze()
{
u_int16_t fid = 0xffff;
bool owi = false;
@@ -208,7 +208,7 @@
}
const u_int16_t
-jinf::get_start_file() throw (jexception)
+jinf::get_start_file()
{
if (!_analyzed_flag)
analyze();
@@ -216,7 +216,7 @@
}
const bool
-jinf::get_initial_owi() throw (jexception)
+jinf::get_initial_owi()
{
if (!_analyzed_flag)
analyze();
@@ -287,7 +287,7 @@
}
void
-jinf::read(const std::string& jinf_filename) throw (jexception)
+jinf::read(const std::string& jinf_filename)
{
// FIXME: This is *not* an XML reader, rather for simplicity, it is a brute-force
// line reader which relies on string recognition.
@@ -335,26 +335,26 @@
}
const u_int16_t
-jinf::u_int16_value(char* line) const throw (jexception)
+jinf::u_int16_value(char* line) const
{
return ::atoi(find_value(line));
}
const u_int32_t
-jinf::u_int32_value(char* line) const throw (jexception)
+jinf::u_int32_value(char* line) const
{
return ::atol(find_value(line));
}
const std::string&
-jinf::string_value(std::string& str, char* line) const throw (jexception)
+jinf::string_value(std::string& str, char* line) const
{
str.assign(find_value(line));
return str;
}
const char*
-jinf::find_value(char* line) const throw (jexception)
+jinf::find_value(char* line) const
{
const char* target1_str = "value=\"";
int target2_char = '\"';
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -32,9 +32,7 @@
#ifndef rhm_journal_jinf_hpp
#define rhm_journal_jinf_hpp
-//#include <sstream>
-//#include <jrnl/file_hdr.hpp>
-#include <jrnl/jexception.hpp>
+#include <string>
namespace rhm
{
@@ -68,14 +66,14 @@
public:
// constructor for reading existing jinf file
- jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception);
+ jinf(const std::string& jinf_filename, bool validate_flag);
// constructor for writing jinf file
jinf(const std::string& jid, const std::string& jdir, const std::string& base_filename,
const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const timespec& ts);
virtual ~jinf();
- void validate() throw (jexception);
- const u_int16_t analyze() throw (jexception);
+ void validate();
+ const u_int16_t analyze();
void write();
inline const u_int8_t jver() const { return _jver; }
@@ -91,18 +89,18 @@
inline const u_int32_t wmgr_num_pages() const { return _wmgr_num_pages; }
inline const u_int32_t rmgr_page_size_dblks() const { return _rmgr_page_size_dblks; }
inline const u_int32_t rmgr_num_pages() const { return _rmgr_num_pages; }
- const u_int16_t get_start_file() throw (jexception);
- const bool get_initial_owi() throw (jexception);
+ const u_int16_t get_start_file();
+ const bool get_initial_owi();
const std::string to_string() const;
const std::string xml_str() const;
private:
- void read(const std::string& jinf_filename) throw (jexception);
- const u_int16_t u_int16_value(char* line) const throw (jexception);
- const u_int32_t u_int32_value(char* line) const throw (jexception);
- const std::string& string_value(std::string& str, char* line) const throw (jexception);
- const char* find_value(char* line) const throw (jexception);
+ void read(const std::string& jinf_filename);
+ const u_int16_t u_int16_value(char* line) const;
+ const u_int32_t u_int32_value(char* line) const;
+ const std::string& string_value(std::string& str, char* line) const;
+ const char* find_value(char* line) const;
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jrec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -46,7 +46,7 @@
jrec::~jrec() {}
void
-jrec::chk_hdr(const hdr& hdr) throw (jexception)
+jrec::chk_hdr(const hdr& hdr)
{
if (hdr._magic == 0)
{
@@ -81,7 +81,7 @@
}
void
-jrec::chk_rid(const hdr& hdr, const u_int64_t rid) throw (jexception)
+jrec::chk_rid(const hdr& hdr, const u_int64_t rid)
{
if (hdr._rid != rid)
{
@@ -94,7 +94,7 @@
}
void
-jrec::chk_tail(const rec_tail& tail, const hdr& hdr) throw (jexception)
+jrec::chk_tail(const rec_tail& tail, const hdr& hdr)
{
if (tail._xmagic != ~hdr._magic)
{
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -115,7 +115,7 @@
* \returns Number of data-blocks encoded.
*/
virtual const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks,
- u_int32_t max_size_dblks) throw (jexception) = 0;
+ u_int32_t max_size_dblks) = 0;
/**
* \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
@@ -148,10 +148,9 @@
* \returns Number of data-blocks read (consumed).
*/
virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
- u_int32_t max_size_dblks) throw (jexception) = 0;
+ u_int32_t max_size_dblks) = 0;
- virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
- throw (jexception) = 0;
+ virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) = 0;
virtual std::string& str(std::string& str) const = 0;
virtual const size_t data_size() const = 0;
@@ -166,12 +165,12 @@
{ return (size + blksize - 1)/blksize; }
protected:
- virtual void chk_hdr() const throw (jexception) = 0;
- virtual void chk_hdr(u_int64_t rid) const throw (jexception) = 0;
- virtual void chk_tail() const throw (jexception) = 0;
- static void chk_hdr(const hdr& hdr) throw (jexception);
- static void chk_rid(const hdr& hdr, u_int64_t rid) throw (jexception);
- static void chk_tail(const rec_tail& tail, const hdr& hdr) throw (jexception);
+ virtual void chk_hdr() const = 0;
+ virtual void chk_hdr(u_int64_t rid) const = 0;
+ virtual void chk_tail() const = 0;
+ static void chk_hdr(const hdr& hdr);
+ static void chk_rid(const hdr& hdr, u_int64_t rid);
+ static void chk_tail(const rec_tail& tail, const hdr& hdr);
virtual void clean() = 0;
}; // class jrec
Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -43,7 +43,7 @@
{}
lfh::lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- rcvdat const * const ro) throw (jexception):
+ rcvdat const * const ro):
nlfh(fbasename, fid, jfsize_sblks, ro)
{}
Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -57,7 +57,7 @@
public:
lfh(const u_int32_t jfsize_sblks);
lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- rcvdat const * const ro) throw (jexception);
+ rcvdat const * const ro);
virtual ~lfh();
};
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -64,7 +64,7 @@
{}
nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- const rcvdat* const ro) throw (jexception):
+ const rcvdat* const ro):
_fname(),
_fid(fid),
_ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
@@ -92,7 +92,7 @@
void
nlfh::initialize(const std::string& fbasename, const u_int16_t fid,
- const u_int32_t jfsize_sblks, const rcvdat* const ro) throw (jexception)
+ const u_int32_t jfsize_sblks, const rcvdat* const ro)
{
std::stringstream ss;
@@ -235,28 +235,28 @@
}
const u_int32_t
-nlfh::decr_enqcnt() throw (jexception)
+nlfh::decr_enqcnt()
{
if (_rec_enqcnt == 0)
- throw jexception (jerrno::JERR__UNDERFLOW, "nlfh", "decr_enqcnt");
+ throw jexception(jerrno::JERR__UNDERFLOW, "nlfh", "decr_enqcnt");
return --_rec_enqcnt;
}
const u_int32_t
-nlfh::subtr_enqcnt(u_int32_t s) throw (jexception)
+nlfh::subtr_enqcnt(u_int32_t s)
{
if (_rec_enqcnt < s)
{
std::stringstream ss;
ss << "_rec_enqcnt=" << _rec_enqcnt << " decr=" << s;
- throw jexception (jerrno::JERR__UNDERFLOW, ss.str().c_str(), "nlfh", "subtr_enqcnt");
+ throw jexception(jerrno::JERR__UNDERFLOW, ss.str().c_str(), "nlfh", "subtr_enqcnt");
}
_rec_enqcnt -= s;
return _rec_enqcnt;
}
const u_int32_t
-nlfh::add_rd_subm_cnt_dblks(u_int32_t a) throw (jexception)
+nlfh::add_rd_subm_cnt_dblks(u_int32_t a)
{
if (_rd_subm_cnt_dblks + a > _wr_subm_cnt_dblks)
{
@@ -271,7 +271,7 @@
}
const u_int32_t
-nlfh::add_rd_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
+nlfh::add_rd_cmpl_cnt_dblks(u_int32_t a)
{
if (_rd_cmpl_cnt_dblks + a > _rd_subm_cnt_dblks)
{
@@ -286,7 +286,7 @@
}
const u_int32_t
-nlfh::add_wr_subm_cnt_dblks(u_int32_t a) throw (jexception)
+nlfh::add_wr_subm_cnt_dblks(u_int32_t a)
{
if (_wr_subm_cnt_dblks + a > _ffull_dblks) // Allow for file header
{
@@ -301,7 +301,7 @@
}
const u_int32_t
-nlfh::add_wr_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
+nlfh::add_wr_cmpl_cnt_dblks(u_int32_t a)
{
if (_wr_cmpl_cnt_dblks + a > _wr_subm_cnt_dblks)
{
@@ -329,7 +329,7 @@
// Private functions
void
-nlfh::open_fh() throw (jexception)
+nlfh::open_fh()
{
_rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
if (_rd_fh < 0)
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -73,11 +73,11 @@
nlfh(const u_int32_t jfsize_sblks);
// Constructors with implicit initialize() and open()
nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- const rcvdat* const ro) throw (jexception);
+ const rcvdat* const ro);
virtual ~nlfh();
virtual void initialize(const std::string& fbasename, const u_int16_t fid,
- const u_int32_t jfsize_sblks, const rcvdat* const ro) throw (jexception);
+ const u_int32_t jfsize_sblks, const rcvdat* const ro);
virtual bool reset(const rcvdat* const ro = NULL);
inline const std::string& fname() const { return _fname; }
@@ -87,24 +87,24 @@
inline const u_int32_t enqcnt() const { return _rec_enqcnt; }
inline const u_int32_t incr_enqcnt() { return ++_rec_enqcnt; }
const u_int32_t add_enqcnt(u_int32_t a);
- const u_int32_t decr_enqcnt() throw (jexception);
- const u_int32_t subtr_enqcnt(u_int32_t s) throw (jexception);
+ const u_int32_t decr_enqcnt();
+ const u_int32_t subtr_enqcnt(u_int32_t s);
inline const u_int32_t rd_subm_cnt_dblks() const { return _rd_subm_cnt_dblks; }
inline const size_t rd_subm_offs() const { return _rd_subm_cnt_dblks * JRNL_DBLK_SIZE; }
- const u_int32_t add_rd_subm_cnt_dblks(u_int32_t a) throw (jexception);
+ const u_int32_t add_rd_subm_cnt_dblks(u_int32_t a);
inline const u_int32_t rd_cmpl_cnt_dblks() const { return _rd_cmpl_cnt_dblks; }
inline const size_t rd_cmpl_offs() const { return _rd_cmpl_cnt_dblks * JRNL_DBLK_SIZE; }
- const u_int32_t add_rd_cmpl_cnt_dblks(u_int32_t a) throw (jexception);
+ const u_int32_t add_rd_cmpl_cnt_dblks(u_int32_t a);
inline const u_int32_t wr_subm_cnt_dblks() const { return _wr_subm_cnt_dblks; }
inline const size_t wr_subm_offs() const { return _wr_subm_cnt_dblks * JRNL_DBLK_SIZE; }
- const u_int32_t add_wr_subm_cnt_dblks(u_int32_t a) throw (jexception);
+ const u_int32_t add_wr_subm_cnt_dblks(u_int32_t a);
inline const u_int32_t wr_cmpl_cnt_dblks() const { return _wr_cmpl_cnt_dblks; }
inline const size_t wr_cmpl_offs() const { return _wr_cmpl_cnt_dblks * JRNL_DBLK_SIZE; }
- const u_int32_t add_wr_cmpl_cnt_dblks(u_int32_t a) throw (jexception);
+ const u_int32_t add_wr_cmpl_cnt_dblks(u_int32_t a);
// Derived helper functions
@@ -131,7 +131,7 @@
const std::string& status_str(std::string& s) const;
protected:
- virtual void open_fh() throw (jexception);
+ virtual void open_fh();
virtual void close_fh();
};
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -98,7 +98,7 @@
{}
pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize, const u_int16_t pages,
- std::deque<data_tok*>* const dtoklp) throw (jexception):
+ std::deque<data_tok*>* const dtoklp):
_pagesize(pagesize),
_pages(pages),
_jc(jc),
@@ -129,7 +129,7 @@
}
void
-pmgr::initialize() throw (jexception)
+pmgr::initialize()
{
std::stringstream ss;
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -140,16 +140,16 @@
pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
const u_int16_t pages);
pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
- const u_int16_t pages, std::deque<data_tok*>* const dtoklp) throw (jexception);
+ const u_int16_t pages, std::deque<data_tok*>* const dtoklp);
virtual ~pmgr();
- virtual const u_int32_t get_events(page_state state) throw (jexception) = 0;
+ virtual const u_int32_t get_events(page_state state) = 0;
inline const u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
static const char* page_state_str(page_state ps);
static const char* iores_str(iores res);
protected:
- virtual void initialize() throw (jexception);
+ virtual void initialize();
virtual void rotate_page() = 0;
virtual void clean();
};
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -32,6 +32,8 @@
#ifndef rhm_journal_rcvdat_hpp
#define rhm_journal_rcvdat_hpp
+#include <iostream>
+#include <iomanip>
#include <map>
#include <vector>
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -50,7 +50,7 @@
{}
rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc,
- std::deque<data_tok*>* const dtokl) throw (jexception):
+ std::deque<data_tok*>* const dtokl):
pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES, dtokl),
_rrfc(rrfc),
_hdr()
@@ -61,7 +61,6 @@
void
rmgr::initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
- throw (jexception)
{
_dtoklp = dtoklp;
_cb = rd_cb;
@@ -72,7 +71,7 @@
const iores
rmgr::get(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const size_t& /*dsize_avail*/,
- const void** const /*data*/, bool /*auto_discard*/) throw (jexception)
+ const void** const /*data*/, bool /*auto_discard*/)
{
//std::cout << " rmgr::get()" << std::flush;
iores res = pre_read_check(NULL);
@@ -166,7 +165,7 @@
}
const iores
-rmgr::discard(data_tok* dtokp) throw (jexception)
+rmgr::discard(data_tok* dtokp)
{
//std::cout << " rmgr::get()" << std::flush;
iores res = pre_read_check(dtokp);
@@ -217,7 +216,7 @@
const iores
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
- bool& external, data_tok* dtokp) throw (jexception)
+ bool& external, data_tok* dtokp)
{
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -383,7 +382,7 @@
}
const u_int32_t
-rmgr::get_events(page_state state) throw (jexception)
+rmgr::get_events(page_state state)
{
int ret = 0;
if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, NULL)) < 0)
@@ -452,13 +451,13 @@
}
void
-rmgr::initialize() throw (jexception)
+rmgr::initialize()
{
pmgr::initialize();
}
const iores
-rmgr::pre_read_check(data_tok* dtokp) throw (jexception)
+rmgr::pre_read_check(data_tok* dtokp)
{
if (_aio_evt_rem)
get_events();
@@ -489,7 +488,6 @@
const iores
rmgr::read_enq(hdr& h, void* rptr, data_tok* dtokp)
- throw (jexception)
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -531,7 +529,7 @@
}
void
-rmgr::consume_xid_rec(hdr& h, void* rptr, data_tok* dtokp) throw (jexception)
+rmgr::consume_xid_rec(hdr& h, void* rptr, data_tok* dtokp)
{
if (h._magic == RHM_JDAT_ENQ_MAGIC)
{
@@ -568,7 +566,7 @@
}
void
-rmgr::consume_filler() throw (jexception)
+rmgr::consume_filler()
{
// Filler (Magic "RHMx") is one dblk by definition
_pg_offset_dblks++;
@@ -577,7 +575,7 @@
}
const iores
-rmgr::skip(data_tok* dtokp) throw (jexception)
+rmgr::skip(data_tok* dtokp)
{
u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
@@ -621,7 +619,7 @@
}
void
-rmgr::aio_cycle() throw (jexception)
+rmgr::aio_cycle()
{
int16_t first_uninit = -1;
u_int16_t num_uninit = 0;
@@ -653,7 +651,7 @@
}
void
-rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit) throw (jexception)
+rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
{
for (int16_t i=0; i<num_uninit; i++)
{
@@ -697,7 +695,7 @@
}
void
-rmgr::consume_fhdr() throw (jexception)
+rmgr::consume_fhdr()
{
// If in the future it should become necessary to read each file header, this is where it would
// happen.
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -64,30 +64,28 @@
public:
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc,
- std::deque<data_tok*>* const dtokl) throw (jexception);
+ std::deque<data_tok*>* const dtokl);
virtual ~rmgr();
- void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
- throw (jexception);
+ void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro);
const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception);
- const iores discard(data_tok* dtok) throw (jexception);
+ const void** const data, bool auto_discard);
+ const iores discard(data_tok* dtok);
const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* dtokp) throw (jexception);
- const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
+ bool& transient, bool& external, data_tok* dtokp);
+ const u_int32_t get_events(page_state state = AIO_COMPLETE);
void recover_complete(size_t fro);
private:
- void initialize() throw (jexception);
- const iores pre_read_check(data_tok* dtokp) throw (jexception);
- const iores read_enq(hdr& h, void* rptr, data_tok* dtokp) throw (jexception);
- void consume_xid_rec(hdr& h, void* rptr, data_tok* dtokp) throw (jexception);
- void consume_filler() throw (jexception);
- const iores skip(data_tok* dtokp) throw (jexception);
- void aio_cycle() throw (jexception);
- void init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
- throw (jexception);
- void consume_fhdr() throw (jexception);
+ void initialize();
+ const iores pre_read_check(data_tok* dtokp);
+ const iores read_enq(hdr& h, void* rptr, data_tok* dtokp);
+ void consume_xid_rec(hdr& h, void* rptr, data_tok* dtokp);
+ void consume_filler();
+ const iores skip(data_tok* dtokp);
+ void aio_cycle();
+ void init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit);
+ void consume_fhdr();
void rotate_page();
const u_int32_t dblks_rem() const;
void set_params_null(void** const datapp, size_t& dsize, void** const xidpp,
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -50,7 +50,7 @@
rrfc::~rrfc() {}
void
-rrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index) throw (jexception)
+rrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
{
_nfiles = nfiles;
_fh_arr = fh_arr;
@@ -59,7 +59,7 @@
}
bool
-rrfc::rotate() throw (jexception)
+rrfc::rotate()
{
if (!_nfiles)
throw jexception(jerrno::JERR__NINIT, "rrfc", "rotate");
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -71,14 +71,13 @@
* each of which correspond to one of the physical files.
* \param fh_index Initial index of journal file. Default = 0.
*/
- void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0)
- throw (jexception);
+ void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
/**
* \brief Rotate active file handle to next file in rotating file group.
* \exception jerrno::JERR__NINIT if called before calling initialize().
*/
- bool rotate() throw (jexception);
+ bool rotate();
/**
* \brief Returns the index of the currently active file within the rotating
@@ -114,12 +113,12 @@
inline const u_int32_t subm_cnt_dblks() const { return _curr_fh->rd_subm_cnt_dblks(); }
inline const size_t subm_offs() const { return _curr_fh->rd_subm_offs(); }
- inline const u_int32_t add_subm_cnt_dblks(u_int32_t a) throw (jexception)
+ inline const u_int32_t add_subm_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_rd_subm_cnt_dblks(a); }
inline const u_int32_t cmpl_cnt_dblks() const { return _curr_fh->rd_cmpl_cnt_dblks(); }
inline const size_t cmpl_offs() const { return _curr_fh->rd_cmpl_offs(); }
- inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
+ inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_rd_cmpl_cnt_dblks(a); }
inline const bool empty() const { return _curr_fh->rd_empty(); }
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -35,9 +35,8 @@
#include <iomanip>
#include <sstream>
#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
-#include <iostream> // for debug
-
namespace rhm
{
namespace journal
@@ -64,7 +63,7 @@
}
const bool
-txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception)
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
{
bool ok = true;
pthread_mutex_lock(&_mutex);
@@ -84,7 +83,7 @@
}
const txn_data_list
-txn_map::get_tdata_list(const std::string& xid) throw (jexception)
+txn_map::get_tdata_list(const std::string& xid)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -99,7 +98,7 @@
}
const txn_data_list
-txn_map::get_remove_tdata_list(const std::string& xid) throw (jexception)
+txn_map::get_remove_tdata_list(const std::string& xid)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -129,7 +128,7 @@
}
const u_int32_t
-txn_map::get_rid_count(const std::string& xid) throw (jexception)
+txn_map::get_rid_count(const std::string& xid)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -144,7 +143,7 @@
}
const bool
-txn_map::is_txn_synced(const std::string& xid) throw (jexception)
+txn_map::is_txn_synced(const std::string& xid)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -171,7 +170,7 @@
}
const bool
-txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception)
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
{
bool ok = true;
bool found = false;
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -44,7 +44,6 @@
#include <map>
#include <pthread.h>
#include <vector>
-#include <jrnl/jexception.hpp>
namespace rhm
{
@@ -79,13 +78,13 @@
txn_map();
virtual ~txn_map();
- const bool insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
- const txn_data_list get_tdata_list(const std::string& xid) throw (jexception);
- const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
+ const bool insert_txn_data(const std::string& xid, const txn_data& td);
+ const txn_data_list get_tdata_list(const std::string& xid);
+ const txn_data_list get_remove_tdata_list(const std::string& xid);
const bool in_map(const std::string& xid);
- const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
- const bool is_txn_synced(const std::string& xid) throw (jexception);
- const bool set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception);
+ const u_int32_t get_rid_count(const std::string& xid);
+ const bool is_txn_synced(const std::string& xid);
+ const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -92,7 +92,7 @@
}
const u_int32_t
-txn_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
+txn_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
{
assert(wptr != NULL);
assert(max_size_dblks > 0);
@@ -201,7 +201,6 @@
const u_int32_t
txn_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception)
{
assert(rptr != NULL);
assert(max_size_dblks > 0);
@@ -312,7 +311,7 @@
}
const bool
-txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
{
if (rec_offs == 0)
{
@@ -400,7 +399,7 @@
}
void
-txn_rec::chk_hdr() const throw (jexception)
+txn_rec::chk_hdr() const
{
jrec::chk_hdr(_txn_hdr._hdr);
if (_txn_hdr._hdr._magic != RHM_JDAT_TXA_MAGIC && _txn_hdr._hdr._magic != RHM_JDAT_TXC_MAGIC)
@@ -416,14 +415,14 @@
}
void
-txn_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+txn_rec::chk_hdr(u_int64_t rid) const
{
chk_hdr();
jrec::chk_rid(_txn_hdr._hdr, rid);
}
void
-txn_rec::chk_tail() const throw (jexception)
+txn_rec::chk_tail() const
{
jrec::chk_tail(_txn_tail, _txn_hdr._hdr);
}
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -73,12 +73,11 @@
// Prepare instance for use in writing data to journal
void reset(const u_int32_t magic, const u_int64_t rid, const void* const xidp,
const size_t xidlen, const bool owi);
- const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception);
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
- u_int32_t max_size_dblks) throw (jexception);
+ u_int32_t max_size_dblks);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs);
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
@@ -87,9 +86,9 @@
const size_t rec_size() const;
private:
- void chk_hdr() const throw (jexception);
- void chk_hdr(u_int64_t rid) const throw (jexception);
- void chk_tail() const throw (jexception);
+ void chk_hdr() const;
+ void chk_hdr(u_int64_t rid) const;
+ void chk_tail() const;
virtual void clean();
}; // class txn_rec
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -64,7 +64,7 @@
}
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, std::deque<data_tok*>* const dtoklp,
- const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception):
+ const u_int32_t max_dtokpp, const u_int32_t max_iowait_us):
pmgr(jc, emap, tmap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, dtoklp),
_wrfc(wrfc),
_max_dtokpp(max_dtokpp),
@@ -91,7 +91,7 @@
void
wmgr::initialize(std::deque<data_tok*>* dtoklp, const aio_cb wr_cb, const u_int32_t max_dtokpp,
- const u_int32_t max_iowait_us, size_t eo) throw (jexception)
+ const u_int32_t max_iowait_us, size_t eo)
{
_dtoklp = dtoklp;
_max_dtokpp = max_dtokpp;
@@ -110,7 +110,7 @@
const iores
wmgr::enqueue(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len,
data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool transient,
- const bool external) throw (jexception)
+ const bool external)
{
if (xid_len)
assert(xid_ptr != NULL);
@@ -249,7 +249,7 @@
}
const iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len) throw (jexception)
+wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
{
if (xid_len)
assert(xid_ptr != NULL);
@@ -400,7 +400,7 @@
}
const iores
-wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len) throw (jexception)
+wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != NULL && xid_len > 0);
@@ -546,7 +546,7 @@
}
const iores
-wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len) throw (jexception)
+wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != NULL && xid_len > 0);
@@ -760,7 +760,7 @@
}
const u_int32_t
-wmgr::get_events(page_state state) throw (jexception)
+wmgr::get_events(page_state state)
{
int ret = 0;
if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, NULL)) < 0)
@@ -879,7 +879,7 @@
}
const bool
-wmgr::is_txn_synced(const std::string& xid) throw (jexception)
+wmgr::is_txn_synced(const std::string& xid)
{
bool is_synced = true;
// Check for outstanding enqueues/dequeues
@@ -896,7 +896,7 @@
}
void
-wmgr::initialize() throw (jexception)
+wmgr::initialize()
{
const u_int16_t num_jfiles = _jc->num_jfiles();
pmgr::initialize();
@@ -925,7 +925,7 @@
}
const iores
-wmgr::pre_write_check(_op_type op, data_tok* dtokp) throw (jexception)
+wmgr::pre_write_check(_op_type op, data_tok* dtokp)
{
// Check status of current file
if (!_wrfc.is_reset())
@@ -1013,7 +1013,7 @@
}
void
-wmgr::write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro) throw (jexception)
+wmgr::write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro)
{
file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, fro, _wrfc.owi(), true);
::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -100,33 +100,29 @@
wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);
wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
std::deque<data_tok*>* const dtoklp, const u_int32_t max_dtokpp,
- const u_int32_t max_iowait_us) throw (jexception);
+ const u_int32_t max_iowait_us);
virtual ~wmgr();
void initialize(std::deque<data_tok*>* const dtoklp, aio_cb wr_cb,
- const u_int32_t max_dtokpp, const u_int32_t max_iowait_us, size_t eo = 0)
- throw (jexception);
+ const u_int32_t max_dtokpp, const u_int32_t max_iowait_us, size_t eo = 0);
const iores enqueue(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
- const size_t xid_len, const bool transient, const bool external) throw (jexception);
- const iores dequeue(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
- throw (jexception);
- const iores abort(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
- throw (jexception);
- const iores commit(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len)
- throw (jexception);
+ const size_t xid_len, const bool transient, const bool external);
+ const iores dequeue(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len);
+ const iores abort(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len);
+ const iores commit(data_tok* dtokp, const void* const xid_ptr, const size_t xid_len);
const iores flush();
- const u_int32_t get_events(page_state state) throw (jexception);
- const bool is_txn_synced(const std::string& xid) throw (jexception);
+ const u_int32_t get_events(page_state state);
+ const bool is_txn_synced(const std::string& xid);
private:
- void initialize() throw (jexception);
- const iores pre_write_check(_op_type op, data_tok* dtokp) throw (jexception);
+ void initialize();
+ const iores pre_write_check(_op_type op, data_tok* dtokp);
const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
- void write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro) throw (jexception);
+ void write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro);
void rotate_page();
void clean();
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -54,7 +54,7 @@
wrfc::~wrfc() {}
void
-wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp) throw (jexception)
+wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp)
{
if (rdp)
{
@@ -75,7 +75,7 @@
}
bool
-wrfc::rotate() throw (jexception)
+wrfc::rotate()
{
if (!_nfiles)
throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -71,13 +71,13 @@
* \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
* NULL.
*/
- void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL) throw (jexception);
+ void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL);
/**
* \brief Rotate active file handle to next file in rotating file group.
* \exception jerrno::JERR__NINIT if called before calling initialize().
*/
- bool rotate() throw (jexception);
+ bool rotate();
inline const u_int64_t rid() const { return _rid; }
inline const u_int64_t get_incr_rid() { return _rid++; }
@@ -91,12 +91,12 @@
inline const u_int32_t subm_cnt_dblks() const { return _curr_fh->wr_subm_cnt_dblks(); }
inline const size_t subm_offs() const { return _curr_fh->wr_subm_offs(); }
- inline const u_int32_t add_subm_cnt_dblks(u_int32_t a) throw (jexception)
+ inline const u_int32_t add_subm_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_wr_subm_cnt_dblks(a); }
inline const u_int32_t cmpl_cnt_dblks() const { return _curr_fh->wr_cmpl_cnt_dblks(); }
inline const size_t cmpl_offs() const { return _curr_fh->wr_cmpl_offs(); }
- inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
+ inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_wr_cmpl_cnt_dblks(a); }
inline const bool empty() const { return _curr_fh->wr_empty(); }
Modified: store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -32,6 +32,7 @@
#include <boost/test/results_reporter.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/test/unit_test_log.hpp>
+#include <iostream>
#include <jrnl/file_hdr.hpp>
#include <jrnl/jcfg.hpp>
Modified: store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-12-07 17:09:35 UTC (rev 1442)
+++ store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-12-07 19:17:58 UTC (rev 1443)
@@ -33,6 +33,7 @@
#include <boost/test/unit_test.hpp>
#include <boost/test/unit_test_log.hpp>
#include <fstream>
+#include <iomanip>
#include <iostream>
#include <jrnl/file_hdr.hpp>
#include <jrnl/jcfg.hpp>
18 years, 4 months
rhmessaging commits: r1442 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-07 12:09:35 -0500 (Fri, 07 Dec 2007)
New Revision: 1442
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jrec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Fix for BZ413021 - Journal recovery fails... This includes new decode functions that examine the record tails for incomplete and corrupt records, and a new check for sblk alignment of the last record. The recover process now writes filler records to the journal to get it back to a recoverable and usable state.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -135,8 +135,8 @@
return "SKIP_PART";
case READ:
return "READ";
+ // Not using default: forces compiler to ensure all cases are covered.
}
- // Not using default: forces compiler to ensure all cases are covered.
return "<rstate unknown>";
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -33,8 +33,6 @@
#ifndef rhm_journal_data_tok_hpp
#define rhm_journal_data_tok_hpp
-#include <qpid/RefCounted.h>
-
namespace rhm
{
namespace journal
@@ -54,6 +52,7 @@
#include <boost/intrusive_ptr.hpp>
#include <pthread.h>
#include <qpid/broker/PersistableMessage.h>
+#include <qpid/RefCounted.h>
#include <sys/types.h>
#include <jrnl/jexception.hpp>
@@ -68,7 +67,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
-class data_tok : public qpid::RefCounted
+ class data_tok : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -315,23 +315,12 @@
}
return size_dblks(rd_cnt);
}
-
+
const bool
deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _deq_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
_deq_hdr._hdr.copy(h);
ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
@@ -341,22 +330,44 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
+ rec_offs = sizeof(_deq_hdr);
+ // Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "deq_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _deq_hdr._xidsize);
- size_t size_read = ifsp->gcount();
- if (size_read < _deq_hdr._xidsize)
- {
- assert(ifsp->eof());
- rec_offs = size_read;
- return false;
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) - _deq_hdr._xidsize);
+ if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_deq_hdr);
+ ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _deq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) +
+ (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+ ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ if (_deq_hdr._xidsize)
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -434,19 +434,9 @@
const bool
enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _enq_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
+ // Read header, allocate (if req'd) for xid
_enq_hdr._hdr.copy(h);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -462,22 +452,60 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler1
#endif
+ rec_offs = sizeof(_enq_hdr);
if (_enq_hdr._xidsize)
{
_buff = ::malloc(_enq_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _enq_hdr._xidsize);
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_enq_hdr);
+ ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ if (!_enq_hdr.is_external())
+ {
+ if (rec_offs < sizeof(hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
+ {
+ // Ignore data (or continue ignoring data)
+ size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ ifsp->ignore(_enq_hdr._dsize - offs);
size_t size_read = ifsp->gcount();
- if (size_read < _enq_hdr._xidsize)
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._dsize - offs)
{
assert(ifsp->eof());
- rec_offs = size_read;
return false;
}
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) - _enq_hdr._xidsize);
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+ (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize) + sizeof(rec_tail))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ if (!_enq_hdr.is_external())
+ offs -= _enq_hdr._dsize;
+ ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -1,34 +1,34 @@
/**
- * \file jcntl.cpp
- *
- * Red Hat Messaging - Message Journal
- *
- * Messaging journal top-level control and interface class
- * rhm::journal::jcntl. See comments in file jcntl.hpp for details.
- *
- * \author Kim van der Riet
- *
- * Copyright 2007 Red Hat, Inc.
- *
- * This file is part of Red Hat Messaging.
- *
- * Red Hat Messaging is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- * USA
- *
- * The GNU Lesser General Public License is available in the file COPYING.
- */
+* \file jcntl.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Messaging journal top-level control and interface class
+* rhm::journal::jcntl. See comments in file jcntl.hpp for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
#include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -90,7 +90,7 @@
void
jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
@@ -132,21 +132,22 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
- const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
_rcvdat.reset();
_emap.clear();
_tmap.clear();
+//std::cout << "Starting journal analysis..." << std::endl;
rcvr_janalyze(_rcvdat, prep_txn_list);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
- // Debug info, but may be useful to print with a flag
- //_rcvdat.print(_jid);
+// Debug info, but may be useful to print with a flag
+//_rcvdat.print(_jid);
if (_datafh)
{
@@ -177,6 +178,7 @@
_readonly_flag = true;
_init_flag = true;
+//std::cout << "Journal analysis complete." << std::endl;
}
void
@@ -190,7 +192,7 @@
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
- //std::cout << "Journal revovery complete." << std::endl;
+//std::cout << "Journal revovery complete." << std::endl;
}
void
@@ -203,8 +205,8 @@
const iores
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_data_record");
@@ -212,7 +214,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
- false);
+ false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -221,7 +223,7 @@
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_data_record");
@@ -237,8 +239,8 @@
const iores
jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_tx_data_record");
@@ -246,7 +248,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
+ transient, false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -255,7 +257,7 @@
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +273,7 @@
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception)
+ const void** const data, bool auto_discard) throw (jexception)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +288,7 @@
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+ bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
{
check_rstatus("read_data");
return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +468,14 @@
_num_jfiles = ji.num_jfiles();
_rcvdat._enq_cnt_list.resize(_num_jfiles);
std::cout << "WARNING: Recovery found " << _num_jfiles <<
- " files (different from --num-jfiles parameter value)." << std::endl;
+ " files (different from --num-jfiles parameter value)." << std::endl;
}
if (_jfsize_sblks != ji.jfsize_sblks())
{
_jfsize_sblks = ji.jfsize_sblks();
std::cout << "WARNING: Recovery found file size = " <<
- (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
- " (different from --jfile-size-pgs parameter value)." << std::endl;
+ (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+ " (different from --jfile-size-pgs parameter value)." << std::endl;
}
try
@@ -499,16 +501,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
- {
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
- }
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+ itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), *itr);
+ if (pitr == prep_txn_list.end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
@@ -516,7 +518,6 @@
jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception)
{
size_t cum_size_read = 0;
- bool done = false;
void* xidp = NULL;
hdr h;
if (!jfile_cycle(fid, ifsp, rd, true))
@@ -525,159 +526,159 @@
ifsp->read((char*)&h, sizeof(hdr));
switch(h._magic)
{
- case RHM_JDAT_ENQ_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- enq_rec er;
- while (!done)
+ case RHM_JDAT_ENQ_MAGIC:
{
- done = er.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
+ enq_rec er;
+ if (!decode(er, fid, ifsp, cum_size_read, h, rd, read_pos))
return false;
+ if (!er.is_transient()) // Ignore transient msgs
+ {
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
+ {
+ er.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ ::free(xidp);
+ }
+ else
+ _emap.insert_fid(h._rid, fid);
+ }
}
- if (!er.is_transient()) // Ignore transient msgs
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
{
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
+ deq_rec dr;
+ if (!decode(dr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ return false;
+ if (dr.xid_size())
{
- er.get_xid(&xidp);
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(dr.deq_rid()); }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ dr.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ std::string xid((char*)xidp, dr.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
::free(xidp);
}
else
- _emap.insert_fid(h._rid, fid);
+ {
+ try
+ {
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+ rd._enq_cnt_list[enq_fid]--;
+ }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ }
}
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- deq_rec dr;
- while (!done)
+ break;
+ case RHM_JDAT_TXA_MAGIC:
{
- done = dr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
+ txn_rec ar;
+ if (!decode(ar, fid, ifsp, cum_size_read, h, rd, read_pos))
return false;
- }
- if (dr.xid_size())
- {
- // If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(dr.deq_rid()); }
- catch(const jexception& e)
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ if (itr->_enq_flag)
+ rd._enq_cnt_list[itr->_fid]--;
}
- dr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
::free(xidp);
}
- else
+ break;
+ case RHM_JDAT_TXC_MAGIC:
{
- try
+ txn_rec cr;
+ if (!decode(cr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ return false;
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
- rd._enq_cnt_list[enq_fid]--;
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
+ }
}
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
+ ::free(xidp);
}
- }
- break;
- case RHM_JDAT_TXA_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- txn_rec ar;
- while (!done)
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
{
- done = ar.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
- // Delete this txn from tmap, unlock any locked records in emap
- ar.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ break;
+ case 0:
+ rd._lfid = fid;
+ rd._eo = ifsp->tellg();
+ return false;
+ default:
+ // Is this the last file, if so, stop as this is the overwrite boundary.
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
{
- try
- {
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid);
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_fid]--;
- }
- ::free(xidp);
- }
- break;
- case RHM_JDAT_TXC_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
+ rd._lfid = fid;
+ rd._eo = read_pos;
return false;
- txn_rec cr;
- while (!done)
- {
- done = cr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
}
- // Delete this txn from tmap, process records into emap
- cr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
- {
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
- {
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
- rd._enq_cnt_list[fid]--;
- }
- }
- ::free(xidp);
- }
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- {
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
- }
- break;
- case 0:
- rd._lfid = fid;
- rd._eo = ifsp->tellg();
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+ ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+ "rcvr_get_next_record");
+ }
+ return true;
+}
+
+const bool
+jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, hdr& h,
+ rcvdat& rd, std::streampos& rec_offset)
+{
+ if (!check_owi(fid, h, rd, rec_offset))
return false;
- default:
- // Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+ bool done = false;
+ while (!done)
+ {
+ try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
+ catch (const jexception& e)
{
+ if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+ fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+ check_journal_alignment(fid, rec_offset);
rd._lfid = fid;
- rd._eo = read_pos;
+ rd._eo = rec_offset;
return false;
}
- std::stringstream ss;
- ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
- "rcvr_get_next_record");
+ if (!jfile_cycle(fid, ifsp, rd, false))
+ return false;
}
-
return true;
}
@@ -705,7 +706,7 @@
std::stringstream ss;
ss << _jdir.dirname() << "/" << _base_filename << ".";
ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
- ifsp->open(ss.str().c_str());
+ ifsp->open(ss.str().c_str(), std::ios_base::in | std::ios_base::binary);
if (!ifsp->good())
throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl", "jfile_cycle");
@@ -730,13 +731,15 @@
}
const bool
-jcntl::check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos) throw (jexception)
+jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
+ throw (jexception)
{
if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
{
u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
{
+ check_journal_alignment(fid, read_pos);
rd._lfid = fid;
rd._eo = read_pos;
return false;
@@ -747,43 +750,90 @@
ss << " foffs=0x" << std::setw(8) << read_pos;
ss << " expected_fid=0x" << std::setw(4) << expected_fid;
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
- "check_owi");
+ "check_owi");
}
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
return true;
}
+
void
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+ throw (jexception)
+{
+ unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+ if (sblk_offs)
+ {
+ // TODO: Connect the following with logger:
+ std::cout << std::hex << "INFO: Bad record alignment found at fid=0x" << fid <<
+ " offs=0x" << rec_offset << " (likely journal overwrite boundary); " <<
+ (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
+ " filler record(s) required." << std::endl;
+ const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+ std::stringstream ss;
+ ss << _jdir.dirname() << "/" << _base_filename << ".";
+ ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ std::ofstream ofsp(ss.str().c_str(),
+ std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!ofsp.good())
+ throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl",
+ "check_journal_alignment");
+ ofsp.seekp(rec_offset);
+ void* buff = ::malloc(JRNL_DBLK_SIZE);
+ assert(buff != NULL);
+ ::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+ // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
+ // situation (i.e. performance is not an issue), and it makes the location of the write
+ // clear should inspection of the file be required.
+ ::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+
+ while (rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+ {
+ ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofsp.fail());
+ // TODO: Connect the following with logger:
+ std::cout << "INFO: * Wrote filler record at offs=0x" << rec_offset << std::endl;
+ rec_offset = ofsp.tellp();
+ }
+ ofsp.close();
+ ::free(buff);
+ // TODO: Connect the following with logger:
+ std::cout << "INFO: Bad record alignment fixed." << std::endl;
+ }
+}
+
+
+void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
- //kpvdr TODO -- this list needs to be mutexed...???
+//kpvdr TODO -- this list needs to be mutexed...???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
+ journal->_aio_wr_cmpl_dtok_list.end());
journal->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
- /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
- */
- break;
- default:
- ;
- }
- }
- dtokp->release();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
}
@@ -792,21 +842,21 @@
jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
{
- //kpvdr TODO -- can we get rid of the copy???
+//kpvdr TODO -- can we get rid of the copy???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
+ journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
// cct call the recovery manager. / lazyload..
- }
- }
- dtokp->release();
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -644,14 +644,22 @@
void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
throw (jexception);
- const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
+ throw (jexception);
+ const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
+ hdr& h, rcvdat& rd, std::streampos& rec_offset);
+
const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const bool jump_fro);
- const bool check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos)
+ const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
throw (jexception);
+
+ void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+ throw (jexception);
+
/**
* \brief Analyze a particular journal file for recovery.
*
Modified: store/trunk/cpp/lib/jrnl/jrec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -52,14 +52,14 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "enq magic NULL: rid=0x" << std::setw(16) << hdr._rid;
+ ss << "enq magic NULL: rid=0x" << hdr._rid;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
}
if (hdr._version != RHM_JDAT_VERSION)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "version: rid=0x" << std::setw(16) << hdr._rid;
+ ss << "version: rid=0x" << hdr._rid;
ss << ": expected=0x" << std::setw(2) << (int)RHM_JDAT_VERSION;
ss << " read=0x" << std::setw(2) << (int)hdr._version;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -73,7 +73,7 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "endian_flag: rid=" << std::setw(16) << hdr._rid;
+ ss << "endian_flag: rid=" << hdr._rid;
ss << ": expected=0x" << std::setw(2) << (int)endian_flag;
ss << " read=0x" << std::setw(2) << (int)hdr._eflag;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -87,8 +87,8 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid mismatch: expected=0x" << std::setw(16) << rid;
- ss << " read=0x" << std::setw(16) << hdr._rid;
+ ss << "rid mismatch: expected=0x" << rid;
+ ss << " read=0x" << hdr._rid;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
}
}
@@ -100,17 +100,17 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "magic: rid=0x" << std::setw(16) << hdr._rid;
- ss << ": expected=0x" << std::setw(8) << ~hdr._magic;
- ss << " read=0x" << std::setw(8) << tail._xmagic;
+ ss << "magic: rid=0x" << hdr._rid;
+ ss << ": expected=0x" << ~hdr._magic;
+ ss << " read=0x" << tail._xmagic;
throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
}
if (tail._rid != hdr._rid)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid: rid=0x" << std::setw(16) << hdr._rid;
- ss << ": read=0x" << std::setw(16) << tail._rid;
+ ss << "rid: rid=0x" << hdr._rid;
+ ss << ": read=0x" << tail._rid;
throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
}
}
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -314,19 +314,9 @@
const bool
txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _txn_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
+ // Read header, allocate for xid
_txn_hdr._hdr.copy(h);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -335,19 +325,38 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
+ rec_offs = sizeof(_txn_hdr);
_buff = ::malloc(_txn_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _txn_hdr._xidsize);
+ }
+ if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_txn_hdr);
+ ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
size_t size_read = ifsp->gcount();
- if (size_read < _txn_hdr._xidsize)
+ rec_offs += size_read;
+ if (size_read < _txn_hdr._xidsize - offs)
{
assert(ifsp->eof());
- rec_offs = size_read;
return false;
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+ if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize + sizeof(rec_tail))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_txn_hdr) - _txn_hdr._xidsize;
+ ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
@@ -387,7 +396,7 @@
const size_t
txn_rec::rec_size() const
{
- return deq_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
+ return txn_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
}
void
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -1003,9 +1003,9 @@
while (_cached_offset_dblks < wdblks)
{
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
- ::memcpy(wptr, (void*)&xmagic, 4);
+ ::memcpy(wptr, (void*)&xmagic, sizeof(xmagic));
#ifdef RHM_CLEAN
- ::memset((char*)wptr + 4, RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - 4);
+ ::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
#endif
_pg_offset_dblks++;
_cached_offset_dblks++;
18 years, 4 months
rhmessaging commits: r1441 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-07 11:34:34 -0500 (Fri, 07 Dec 2007)
New Revision: 1441
Modified:
mgmt/cumin/python/cumin/charts.py
mgmt/cumin/python/cumin/formats.py
mgmt/cumin/python/cumin/model.py
mgmt/notes/justin-todo.txt
Log:
It's not as nice as I'd like, but it's a start. Adds x axis
timestamps to charts.
Also adds a fmt_duration_brief function for compact rendering of
durations.
Modified: mgmt/cumin/python/cumin/charts.py
===================================================================
--- mgmt/cumin/python/cumin/charts.py 2007-12-06 19:27:01 UTC (rev 1440)
+++ mgmt/cumin/python/cumin/charts.py 2007-12-07 16:34:34 UTC (rev 1441)
@@ -1,6 +1,9 @@
from cairo import *
from random import random
+from time import mktime
+from formats import *
+
class LineChart(object):
def __init__(self, width, height):
self.width = width
@@ -37,25 +40,29 @@
cr.stroke()
- def plot_x_axis(self, values, interval=40):
+ def plot_x_axis(self, values, interval=50):
if values:
cr = Context(self.surface)
cr.set_line_width(0.2)
cr.set_source_rgb(0.8, 0.8, 0.8)
- zero = values[0]
- xs = range(self.width, 0 - interval, -interval)
+ if values:
+ tzero = mktime(values[0].timetuple())
- for x in xs:
- cr.move_to(x, 0)
- cr.line_to(x, self.height + 10)
+ xs = range(self.width, 0 - interval, -interval)
- # XXX
- #index = 120 - x // self.value_interval
- #value = values[index] - zero
- #print x, index, value
- #cr.show_text(value.strftime("%S"))
+ for x, i in zip(xs, range(0, 120)):
+ cr.move_to(x, 0)
+ cr.line_to(x, self.height + 10)
+ if i % 4 == 0:
+ index = 120 - x // self.value_interval
+
+ if len(values) > index:
+ value = mktime(values[index].timetuple()) - tzero
+
+ cr.show_text(fmt_duration_brief(value))
+
cr.stroke()
def plot_y_axis(self):
Modified: mgmt/cumin/python/cumin/formats.py
===================================================================
--- mgmt/cumin/python/cumin/formats.py 2007-12-06 19:27:01 UTC (rev 1440)
+++ mgmt/cumin/python/cumin/formats.py 2007-12-07 16:34:34 UTC (rev 1441)
@@ -12,6 +12,9 @@
def fmt_duration(secs):
"""Takes a duration in seconds, which can be a float"""
+ sign = secs < 0 and " ago" or ""
+ secs = abs(secs)
+
elems = list()
periods = (86400, 3600, 60, 1)
units = ("day", "hour", "min", "sec")
@@ -22,12 +25,34 @@
elems.append("%i %s%s" % (count, unit, ess(count)))
if len(elems) == 2:
- return ", ".join(elems)
+ return ", ".join(elems) + sign
secs = secs % period
- return ", ".join(elems)
+ return ", ".join(elems) + sign
+def fmt_duration_brief(secs):
+ """Takes a duration in seconds, which can be a float"""
+
+ sign = secs < 0 and "-" or ""
+ secs = abs(secs)
+
+ elems = list()
+ periods = (86400, 3600, 60, 1)
+ units = ("d", "h", "m", "s")
+
+ for period, unit in zip(periods, units):
+ if secs > period:
+ count = secs // period
+ elems.append("%i%s" % (count, unit))
+
+ if len(elems) == 2:
+ return sign + "".join(elems)
+
+ secs = secs % period
+
+ return sign + "".join(elems)
+
def fmt_rate(value, unit1, unit2):
#return "%i <small>%s/%s</small>" % (value, unit1, unit2)
return "%i<small>/%s</small>" % (nvl(value, 0), unit2)
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-12-06 19:27:01 UTC (rev 1440)
+++ mgmt/cumin/python/cumin/model.py 2007-12-07 16:34:34 UTC (rev 1441)
@@ -70,6 +70,7 @@
name = self.cumin_class.name
cls = self.cumin_class.mint_stats_class
+ # XXX get rid of this
stats = cls.select("%s_id = %i" % (name, object.id),
orderBy="-id")[:limit]
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-06 19:27:01 UTC (rev 1440)
+++ mgmt/notes/justin-todo.txt 2007-12-07 16:34:34 UTC (rev 1441)
@@ -10,8 +10,6 @@
* Add legends to charts
- * Add x and y axis values to charts
-
* Prepare for journal stats on queue
* Make sure queue accel. a proper rate value
18 years, 4 months
rhmessaging commits: r1440 - mgmt/notes.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-06 14:27:01 -0500 (Thu, 06 Dec 2007)
New Revision: 1440
Modified:
mgmt/notes/justin-todo.txt
Log:
Reorder todos for monday demo.
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-06 19:26:46 UTC (rev 1439)
+++ mgmt/notes/justin-todo.txt 2007-12-06 19:27:01 UTC (rev 1440)
@@ -1,15 +1,5 @@
Current
- * Add ability to send a test message to a queue
-
- * Queue: Add a msg enq rate msg deq rate chart
-
- * Sort in tables
-
- * Restore high-low
-
- * Restore the consumer, producer, and bindings stat links
-
* Add queue journal stats
* "purge messages from queues"
@@ -26,8 +16,26 @@
* Make sure queue accel. a proper rate value
+ * Paginate brokers
+
+ * Paginate queues
+
+ * Better demo data
+
+ * Make group slider in broker browser work
+
Deferred
+ * Queue: Add a msg enq rate msg deq rate chart
+
+ * Sort in tables
+
+ * Restore the consumer, producer, and bindings stat links
+
+ * Add ability to send a test message to a queue
+
+ * Restore high-low
+
* Fix sqlobject init: make it happen at cumin init
* "remove broker groups"
18 years, 4 months
rhmessaging commits: r1439 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-06 14:26:46 -0500 (Thu, 06 Dec 2007)
New Revision: 1439
Modified:
mgmt/mint/python/mint/__init__.py
Log:
Reduces debug printing.
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-12-06 19:26:26 UTC (rev 1438)
+++ mgmt/mint/python/mint/__init__.py 2007-12-06 19:26:46 UTC (rev 1439)
@@ -85,8 +85,6 @@
regs = BrokerRegistration.selectBy(host=host, port=port)
- print "regs", regs
-
for reg in regs:
print "Attaching broker to reg", reg
18 years, 4 months
rhmessaging commits: r1438 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-06 14:26:26 -0500 (Thu, 06 Dec 2007)
New Revision: 1438
Modified:
mgmt/cumin/python/cumin/broker.py
Log:
Don't use get_brokers. It doesn't exist anymore.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-12-06 18:24:12 UTC (rev 1437)
+++ mgmt/cumin/python/cumin/broker.py 2007-12-06 19:26:26 UTC (rev 1438)
@@ -42,7 +42,7 @@
self.add_child(self.paginator)
def get_title(self, session, model):
- return "Brokers %s" % fmt_count(len(model.get_brokers()))
+ return "Brokers %s" % fmt_count(BrokerRegistration.select().count())
def do_get_items(self, session, model):
start, end = self.paginator.get_bounds(session)
18 years, 4 months
rhmessaging commits: r1437 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2007-12-06 13:24:12 -0500 (Thu, 06 Dec 2007)
New Revision: 1437
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Replaced TimerA with Timer, use intrusive_ptr for Timer.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#include "BdbMessageStore.h"
@@ -62,8 +62,8 @@
prepareXidDb(&env, 0),
numJrnlFiles(8),
jrnlFsizePgs(24),
- isInit(false),
- envPath(envpath)
+ isInit(false),
+ envPath(envpath)
{
@@ -72,15 +72,15 @@
bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs)
{
- if (isInit) return true;
+ if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
- useAsync = async;
- if (dir.size()>0) storeDir = dir;
+ useAsync = async;
+ if (dir.size()>0) storeDir = dir;
- string bdbdir = storeDir + "/rhm/dat/";
- journal::jdir::create_dir(bdbdir);
+ string bdbdir = storeDir + "/rhm/dat/";
+ journal::jdir::create_dir(bdbdir);
bool ret = false;
@@ -111,11 +111,11 @@
txn.abort();
throw;
}
- ret = mode(useAsync, force);
- if (!ret) return false;
+ ret = mode(useAsync, force);
+ if (!ret) return false;
- isInit = true;
- return true;
+ isInit = true;
+ return true;
}
bool BdbMessageStore::init(const qpid::Options* options)
@@ -154,17 +154,17 @@
bool BdbMessageStore::mode(const bool async, const bool force)
{
- u_int32_t id (1); // key one in config is mode
+ u_int32_t id (1); // key one in config is mode
Dbt key(&id, sizeof(id));
size_t preamble_length = sizeof(u_int32_t);
BufferValue value(preamble_length, 0);
- u_int32_t avalue = async ? 1 : 2;
- value.buffer.putLong( avalue );
- bool same = false;
- bool hasMode = false;
+ u_int32_t avalue = async ? 1 : 2;
+ value.buffer.putLong( avalue );
+ bool same = false;
+ bool hasMode = false;
{
- Cursor config;
+ Cursor config;
config.open(configDb, 0);
IdDbt rkey;
BufferValue rvalue(preamble_length, 0);
@@ -172,21 +172,21 @@
while (config.next(rkey, rvalue)) {
if (rkey.id == 1)
- {
- hasMode = true;
- u_int32_t valueL = rvalue.buffer.getLong();
- if (avalue == valueL){
- same = true;
- }else {
- break;
- }
+ {
+ hasMode = true;
+ u_int32_t valueL = rvalue.buffer.getLong();
+ if (avalue == valueL){
+ same = true;
+ }else {
+ break;
+ }
}
- }
+ }
}
if (same) return true;
- if (!same && !force && hasMode) return false;
- if (!same && force && hasMode) {
- truncate();
+ if (!same && !force && hasMode) return false;
+ if (!same && force && hasMode) {
+ truncate();
}
int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
@@ -195,12 +195,12 @@
} else {
return true;
}
- return false;
+ return false;
}
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
- if(dupKey) db.set_flags(DB_DUPSORT);
+ if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
@@ -230,43 +230,43 @@
}
txn->commit(0);
- try{
+ try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
}
- catch (const journal::jexception& e) {
+ catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
}
}
void BdbMessageStore::create(PersistableQueue& queue)
{
- checkInit();
+ checkInit();
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
- // init will create the deque's for the init...
- jQueue->initialize();
- } catch (const journal::jexception& e) {
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize();
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
}
try {
if (!create(queueDb, queueIdSequence, queue)) {
- THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+ THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
} catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
+ THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
}
}
void BdbMessageStore::destroy(PersistableQueue& queue)
{
- checkInit();
+ checkInit();
destroy(queueDb, queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs)
@@ -280,7 +280,7 @@
void BdbMessageStore::create(const PersistableExchange& exchange)
{
- checkInit();
+ checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
@@ -296,7 +296,7 @@
void BdbMessageStore::destroy(const PersistableExchange& exchange)
{
- checkInit();
+ checkInit();
destroy(exchangeDb, exchange);
//need to also delete bindings
IdDbt key(exchange.getPersistenceId());
@@ -326,9 +326,9 @@
void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+ const std::string& k, const FieldTable& a)
{
- checkInit();
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
@@ -338,9 +338,9 @@
}
void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+ const std::string& k, const FieldTable& a)
{
- checkInit();
+ checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
@@ -357,7 +357,7 @@
void BdbMessageStore::recover(RecoveryManager& registry)
{
- checkInit();
+ checkInit();
txn_list prepared;
recoverXids(prepared);
@@ -384,17 +384,17 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->dequeue(queues[j->first], messages[j->second]);
}
}
@@ -403,7 +403,7 @@
}
void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
-prepared, message_index& messages)
+ prepared, message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -423,22 +423,22 @@
if (usingJrnl())
{
- const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
- queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ const char* queueName = queue->getName().c_str();
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
+ queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- u_int64_t thisHighestRid = 0;
- jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recover_complete(); // start journal.
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
- }
- //read all messages: done on a per queue basis if using Journal
+ try
+ {
+ u_int64_t thisHighestRid = 0;
+ jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recover_complete(); // start journal.
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
+ }
+ //read all messages: done on a per queue basis if using Journal
}
queue_index[key.id] = queue;
@@ -447,7 +447,7 @@
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
- if (!usingJrnl()) //read all messages:
+ if (!usingJrnl()) //read all messages:
recoverMessages(txn, registry, queue_index, prepared, messages);
}
@@ -508,15 +508,15 @@
// async IO version.
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+ qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- DataTokenImpl dtokp;
- size_t readSize = 0;
- unsigned msg_count=0;
+ DataTokenImpl dtokp;
+ size_t readSize = 0;
+ unsigned msg_count=0;
bool read = true;
void* dbuff = NULL; size_t dbuffSize = 0;
@@ -526,77 +526,77 @@
dtokp.set_wstate(DataTokenImpl::ENQ);
- // read the message from the Journal.
+ // read the message from the Journal.
try {
-//std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
+ //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
unsigned aio_sleep_cnt = 0;
- while (read) {
+ while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
- readSize = dtokp.dsize();
+ readSize = dtokp.dsize();
switch (res)
{
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- RecoverableMessage::shared_ptr msg;
- char* data = (char*)dbuff;
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
- unsigned headerSize;
- if (externalFlag){
- msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
- } else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- msg = recovery.recoverMessage(headerBuff);
- }
- msg->setPersistenceId(dtokp.rid());
+ unsigned headerSize;
+ if (externalFlag){
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
+ msg->setPersistenceId(dtokp.rid());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize) && !externalFlag) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize) && !externalFlag) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
- if (xidbuff)
- ::free(xidbuff);
- else if (dbuff)
- ::free(dbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
- default:
- assert( "Store Error: Unexpected msg state");
+ if (xidbuff)
+ ::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
- ": recoverMessages() failed: " + e.what());
- }
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
+ ": recoverMessages() failed: " + e.what());
+ }
}
RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t messageId, unsigned& headerSize)
+ uint64_t messageId, unsigned& headerSize)
{
Dbt key (&messageId, sizeof(messageId));
size_t preamble_length = sizeof(u_int32_t)/*header size*/;
@@ -604,17 +604,17 @@
BufferValue value(preamble_length, 0);
value.buffer.record();
if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
- //read header only to begin with
+ //read header only to begin with
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
- return recovery.recoverMessage(header.buffer);
+ return recovery.recoverMessage(header.buffer);
}
@@ -695,34 +695,34 @@
std::set<string> prepared;
collectPreparedXids(prepared);
- //when using the async journal, it will abort unprepaired xids and populate the locked maps
- if (!usingJrnl()){
- txn_lock_map enqueues;
- txn_lock_map dequeues;
+ //when using the async journal, it will abort unprepaired xids and populate the locked maps
+ if (!usingJrnl()){
+ txn_lock_map enqueues;
+ txn_lock_map dequeues;
std::set<string> known;
readXids(enqueueXidDb, known);
readXids(dequeueXidDb, known);
//abort all known but unprepared xids:
for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
+ if (prepared.find(*i) == prepared.end()) {
TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb, false);
- }
+ completed(txn, dequeueXidDb, enqueueXidDb, false);
+ }
}
- readLockedMappings(enqueueXidDb, enqueues);
- readLockedMappings(dequeueXidDb, dequeues);
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
- } else {
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ readLockedMappings(enqueueXidDb, enqueues);
+ readLockedMappings(dequeueXidDb, dequeues);
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+ }
+ } else {
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
- }
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ }
}
}
@@ -751,12 +751,12 @@
std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
/*
- txn_lock_map::iterator i = mappings.find(xid);
- if (i == mappings.end()) {
- LockedMappings::shared_ptr ptr(new LockedMappings());
- i = mappings.insert(std::make_pair(xid, ptr)).first;
- }
- i->second->add(value.queueId(), value.messageId());
+ txn_lock_map::iterator i = mappings.find(xid);
+ if (i == mappings.end()) {
+ LockedMappings::shared_ptr ptr(new LockedMappings());
+ i = mappings.insert(std::make_pair(xid, ptr)).first;
+ }
+ i->second->add(value.queueId(), value.messageId());
*/
}
}
@@ -768,7 +768,7 @@
void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
{
- checkInit();
+ checkInit();
TxnCtxt txn;
txn.begin(env, true);
@@ -779,7 +779,7 @@
messageId = messageIdSequence.next();
store(NULL, &txn, key, msg, true);
msg->setPersistenceId(messageId);
- txn.commit();
+ txn.commit();
} catch (const std::exception& e) {
txn.abort();
throw;
@@ -788,7 +788,7 @@
}
void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
{
- checkInit();
+ checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
@@ -822,7 +822,7 @@
void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
- checkInit();
+ checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
@@ -850,9 +850,9 @@
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
{
- checkInit();
+ checkInit();
u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
@@ -860,10 +860,10 @@
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ){
- if (jc->loadMsgContent(messageId, data, realOffset, length)){
- return;
- }
- }
+ if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ return;
+ }
+ }
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
Dbt value(buffer, length);
@@ -873,17 +873,17 @@
value.set_dlen(length);
int status = messageDb.get(0, &key, &value, 0);
if (status == DB_NOTFOUND) {
- delete [] buffer;
+ delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
data.assign(buffer, value.get_size());
- delete [] buffer;
+ delete [] buffer;
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error loading content", e);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": loadContent() failed: " + e.what());
+ ": loadContent() failed: " + e.what());
}
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -893,21 +893,21 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
if (!usingJrnl()) return;
- checkInit();
+ checkInit();
try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc){
- jc->flush();
- }
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (jc){
+ jc->flush();
+ }
}catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
- }
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
+ }
}
void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+ const PersistableQueue& queue)
{
- checkInit();
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
@@ -931,24 +931,24 @@
if (messageId == 0) {
messageId = messageIdSequence.next();
msg->setPersistenceId(messageId);
- newId = true;
- }
+ newId = true;
+ }
store(&queue, txn, key, msg, newId);
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
- }else{
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
+ }else{
msg->enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
- // cct if using Journal do we need to wait for IO to complete before calling thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
- }
- }
+ // cct if using Journal do we need to wait for IO to complete before calling thus???
+ // set enqueue comple on callback msg.enqueueComplete();
+ if (txn->isTPC()) {
+ record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
+ }
+ }
if (!ctxt) txn->commit();
} catch (const std::exception& e) {
@@ -958,27 +958,27 @@
}
void BdbMessageStore::store(const PersistableQueue* queue,
- TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
- bool newId)
+ TxnCtxt* txn, Dbt& messageId,
+ intrusive_ptr<PersistableMessage>& message,
+ bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
char* buff= 0;
- if (!message->isContentReleased() )
- {
- buff = static_cast<char*>(::alloca(size)); // long + headers + content
+ if (!message->isContentReleased() )
+ {
+ buff = static_cast<char*>(::alloca(size)); // long + headers + content
Buffer buffer(buff,size);
buffer.putLong(headerSize);
message->encode(buffer);
- }
+ }
try {
- if ( queue && usingJrnl()){
-//std::cout << "E" << std::flush;
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->ref();
+ if ( queue && usingJrnl()){
+ //std::cout << "E" << std::flush;
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
@@ -987,71 +987,71 @@
unsigned busy_sleep_cnt = 0;
while (!written)
{
- JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres;
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
- }else {
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
- }
- }else {
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
- } else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
- }
- }
+ JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+ rhm::journal::iores eres;
+ if (txn->getXid().empty()){
+ if (message->isContentReleased()){
+ eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ }else {
+ eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ }
+ }else {
+ if (message->isContentReleased()){
+ eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+ } else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+ }
+ }
switch (eres)
{
- case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
- if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
- written = true;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
- break;
- default:
- assert( "Store Error: Unexpected msg state");
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //std::cout << "." << std::flush;
+ if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+ written = true;
+ aio_sleep_cnt = 0;
+ busy_sleep_cnt = 0;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ //std::cout << "w" << std::flush;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+ usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+ jc->get_wr_events();
+ break;
+ case rhm::journal::RHM_IORES_BUSY:
+ //std::cout << "b" << std::flush;
+ if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ break;
+ case rhm::journal::RHM_IORES_FULL:
+ std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
}
}
- } else {
- /// cct message db
- if (newId){ // only store in Bd if first time message is stored
- Dbt data(buff,size);
- messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
- }
- }
+ } else {
+ /// cct message db
+ if (newId){ // only store in Bd if first time message is stored
+ Dbt data(buff,size);
+ messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
+ }
+ }
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
- e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
+ e.what());
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+ const PersistableQueue& queue)
{
- checkInit();
+ checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (messageId == 0) {
@@ -1072,23 +1072,23 @@
try {
- if (usingJrnl()){
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
// added here as we are not doing it async on call back
- if (msg->isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
- {
- Dbt key (&messageId, sizeof(messageId));
+ if (msg->isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
+ {
+ Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
dequeue(txn->get(), key, value);
- }
+ }
- msg->dequeueComplete();
-// if ( msg->isDequeueComplete() ) // clear id after last dequeue
-// msg->setPersistenceId(0);
+ msg->dequeueComplete();
+ // if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ // msg->setPersistenceId(0);
- } else if (txn->isTPC()) {
+ } else if (txn->isTPC()) {
//if this is part of a 2pc transaction, then only record the dequeue now,
//it will be applied on commit
record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
@@ -1098,7 +1098,7 @@
if (dequeue(txn->get(), key, value)) {
msg->setPersistenceId(0);//clear id as we have now removed the message from the store
msg->dequeueComplete(); // set dequeued for ack
- }
+ }
}
if (!ctxt) txn->commit();
@@ -1111,62 +1111,64 @@
}
}
-void BdbMessageStore::async_dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(
+ TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
-//std::cout << "D" << std::flush;
+ //std::cout << "D" << std::flush;
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
- ddtokp->ref();
- ddtokp->setSourceMessage(msg);
- ddtokp->set_rid(messageIdSequence.next());
- ddtokp->set_dequeue_rid(msg->getPersistenceId());
- ddtokp->set_wstate(DataTokenImpl::ENQ);
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- string tid;
+ ddtokp->addRef();
+ ddtokp->setSourceMessage(msg);
+ ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_dequeue_rid(msg->getPersistenceId());
+ ddtokp->set_wstate(DataTokenImpl::ENQ);
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ string tid;
if (ctxt){
- TxnCtxt* txn = check(ctxt);
- tid = txn->getXid();
- }
+ TxnCtxt* txn = check(ctxt);
+ tid = txn->getXid();
+ }
unsigned aio_sleep_cnt = 0;
unsigned busy_sleep_cnt = 0;
while (!written)
{
- rhm::journal::iores dres;
- try {
- if (tid.empty()){
- dres = jc->dequeue_data_record(ddtokp.get());
- } else {
- dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
- }
- switch (dres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- written = true;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- default:
- assert( "Store Error: Unexpected msg state");
- }
+ rhm::journal::iores dres;
+ try {
+ if (tid.empty()){
+ dres = jc->dequeue_data_record(ddtokp.get());
+ } else {
+ dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+ }
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //std::cout << "." << std::flush;
+ aio_sleep_cnt = 0;
+ busy_sleep_cnt = 0;
+ written = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ //std::cout << "w" << std::flush;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ jc->get_wr_events();
+ break;
+ case rhm::journal::RHM_IORES_BUSY:
+ //std::cout << "b" << std::flush;
+ if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
}
}
@@ -1215,7 +1217,7 @@
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
- checkInit();
+ checkInit();
return 0;
}
@@ -1285,8 +1287,8 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
- checkInit();
- // pass sequence number for c/a when using jrnl
+ checkInit();
+ // pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
txn->begin(env, !usingJrnl());
return auto_ptr<TransactionContext>(txn);
@@ -1294,11 +1296,11 @@
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
{
- checkInit();
- IdSequence* jtx = NULL;
- if (usingJrnl()) jtx = &messageIdSequence;
+ checkInit();
+ IdSequence* jtx = NULL;
+ if (usingJrnl()) jtx = &messageIdSequence;
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
txn->begin(env, !usingJrnl());
return auto_ptr<TPCTransactionContext>(txn);
@@ -1306,7 +1308,7 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
- checkInit();
+ checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
@@ -1316,8 +1318,8 @@
Dbt key ((void*) xid.data(), xid.length());
Dbt value(&dummy, sizeof(dummy));
- // make sure all the data is written to disk before returning
- txn->sync();
+ // make sure all the data is written to disk before returning
+ txn->sync();
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
@@ -1329,8 +1331,8 @@
void BdbMessageStore::commit(TransactionContext& ctxt)
{
- checkInit();
- TxnCtxt* txn(check(&ctxt));
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);
} else {
@@ -1340,7 +1342,7 @@
void BdbMessageStore::abort(TransactionContext& ctxt)
{
- checkInit();
+ checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -29,10 +29,13 @@
using namespace rhm::bdbstore;
using namespace rhm::journal;
-qpid::broker::TimerA JournalImpl::journalTimer;
+qpid::broker::Timer JournalImpl::journalTimer;
void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() { if (parent) parent->getEventsFire(); unref(); }
+void GetEventsFireEvent::fire() {
+ if (parent) parent->getEventsFire();
+ release();
+}
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -244,7 +247,7 @@
{
jcntl::flush();
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
- intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+ getEventsFireEventsPtr->addRef();
journalTimer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
@@ -265,7 +268,7 @@
}
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) {
- intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+ getEventsFireEventsPtr->addRef();
journalTimer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-06 18:24:12 UTC (rev 1437)
@@ -38,25 +38,25 @@
class JournalImpl;
- class InactivityFireEvent : public virtual qpid::broker::TimerTaskA
+ class InactivityFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTaskA(timeout), parent(p) {}
+ qpid::broker::TimerTask(timeout), parent(p) {}
virtual ~InactivityFireEvent() {}
void fire();
inline void cancel() { parent=0; }
};
- class GetEventsFireEvent : public virtual qpid::broker::TimerTaskA
+ class GetEventsFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTaskA(timeout), parent(p) {}
+ qpid::broker::TimerTask(timeout), parent(p) {}
virtual ~GetEventsFireEvent() {}
void fire();
inline void cancel() { parent=0; }
@@ -65,14 +65,14 @@
class JournalImpl : public journal::jcntl
{
private:
- static qpid::broker::TimerA journalTimer;
+ static qpid::broker::Timer journalTimer;
bool getEventsTimerSetFlag;
- qpid::broker::TimerTaskA::intrusive_ptr getEventsFireEventsPtr;
+ qpid::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
bool writeActivityFlag;
bool flushTriggeredFlag;
- qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+ qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
// temp local vars for loadMsgContent below
void* _xidp;
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-12-06 18:24:12 UTC (rev 1437)
@@ -72,7 +72,7 @@
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->ref();
+ dtokp->addRef();
dtokp->set_rid(loggedtx->next());
try{
if (commit)
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -41,25 +41,11 @@
namespace journal
{
-void intrusive_ptr_add_ref(data_tok* tok)
-{
- tok->ref();
-}
-
-void intrusive_ptr_release(data_tok* tok)
-{
- tok->unref();
- if (tok->refcnt() == 0)
- delete tok;
-}
-
-
// Static members
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
- _ref_cnt(0),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -33,6 +33,8 @@
#ifndef rhm_journal_data_tok_hpp
#define rhm_journal_data_tok_hpp
+#include <qpid/RefCounted.h>
+
namespace rhm
{
namespace journal
@@ -66,7 +68,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
- class data_tok
+class data_tok : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
@@ -102,7 +104,6 @@
};
private:
- size_t _ref_cnt; ///< Ref count for auto cleanup
pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
@@ -121,9 +122,6 @@
data_tok();
virtual ~data_tok();
- inline size_t refcnt(void) { return _ref_cnt;}
- inline void ref(void) { _ref_cnt++; }
- inline void unref(void) { _ref_cnt--; }
inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
{ return _sourceMsg; }
inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
@@ -171,10 +169,6 @@
void reset();
};
-
- void intrusive_ptr_add_ref(data_tok* r);
- void intrusive_ptr_release(data_tok* r);
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,34 +1,34 @@
/**
-* \file jcntl.cpp
-*
-* Red Hat Messaging - Message Journal
-*
-* Messaging journal top-level control and interface class
-* rhm::journal::jcntl. See comments in file jcntl.hpp for details.
-*
-* \author Kim van der Riet
-*
-* Copyright 2007 Red Hat, Inc.
-*
-* This file is part of Red Hat Messaging.
-*
-* Red Hat Messaging is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License as published by the Free Software Foundation; either
-* version 2.1 of the License, or (at your option) any later version.
-*
-* This library is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this library; if not, write to the Free Software
-* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-* USA
-*
-* The GNU Lesser General Public License is available in the file COPYING.
-*/
+ * \file jcntl.cpp
+ *
+ * Red Hat Messaging - Message Journal
+ *
+ * Messaging journal top-level control and interface class
+ * rhm::journal::jcntl. See comments in file jcntl.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright 2007 Red Hat, Inc.
+ *
+ * This file is part of Red Hat Messaging.
+ *
+ * Red Hat Messaging is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
#include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -90,7 +90,7 @@
void
jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
@@ -132,8 +132,8 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
- const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
@@ -145,8 +145,8 @@
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
-// Debug info, but may be useful to print with a flag
-//_rcvdat.print(_jid);
+ // Debug info, but may be useful to print with a flag
+ //_rcvdat.print(_jid);
if (_datafh)
{
@@ -190,7 +190,7 @@
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
-//std::cout << "Journal revovery complete." << std::endl;
+ //std::cout << "Journal revovery complete." << std::endl;
}
void
@@ -203,8 +203,8 @@
const iores
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_data_record");
@@ -212,7 +212,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
- false);
+ false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -221,7 +221,7 @@
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_data_record");
@@ -237,8 +237,8 @@
const iores
jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_tx_data_record");
@@ -246,7 +246,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
+ transient, false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -255,7 +255,7 @@
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +271,7 @@
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception)
+ const void** const data, bool auto_discard) throw (jexception)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +286,7 @@
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+ bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
{
check_rstatus("read_data");
return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +466,14 @@
_num_jfiles = ji.num_jfiles();
_rcvdat._enq_cnt_list.resize(_num_jfiles);
std::cout << "WARNING: Recovery found " << _num_jfiles <<
- " files (different from --num-jfiles parameter value)." << std::endl;
+ " files (different from --num-jfiles parameter value)." << std::endl;
}
if (_jfsize_sblks != ji.jfsize_sblks())
{
_jfsize_sblks = ji.jfsize_sblks();
std::cout << "WARNING: Recovery found file size = " <<
- (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
- " (different from --jfile-size-pgs parameter value)." << std::endl;
+ (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+ " (different from --jfile-size-pgs parameter value)." << std::endl;
}
try
@@ -499,16 +499,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
- {
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
- }
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+ itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), *itr);
+ if (pitr == prep_txn_list.end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
@@ -525,157 +525,157 @@
ifsp->read((char*)&h, sizeof(hdr));
switch(h._magic)
{
- case RHM_JDAT_ENQ_MAGIC:
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ enq_rec er;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = er.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- enq_rec er;
- while (!done)
- {
- done = er.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
- }
- if (!er.is_transient()) // Ignore transient msgs
- {
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
- {
- er.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- ::free(xidp);
- }
- else
- _emap.insert_fid(h._rid, fid);
- }
}
- break;
- case RHM_JDAT_DEQ_MAGIC:
+ if (!er.is_transient()) // Ignore transient msgs
{
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- deq_rec dr;
- while (!done)
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
{
- done = dr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
- }
- if (dr.xid_size())
- {
- // If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(dr.deq_rid()); }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- dr.get_xid(&xidp);
+ er.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
::free(xidp);
}
else
- {
- try
- {
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
- rd._enq_cnt_list[enq_fid]--;
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- }
+ _emap.insert_fid(h._rid, fid);
}
- break;
- case RHM_JDAT_TXA_MAGIC:
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ deq_rec dr;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = dr.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- txn_rec ar;
- while (!done)
+ }
+ if (dr.xid_size())
+ {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(dr.deq_rid()); }
+ catch(const jexception& e)
{
- done = ar.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
- // Delete this txn from tmap, unlock any locked records in emap
- ar.get_xid(&xidp);
+ dr.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ std::string xid((char*)xidp, dr.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ ::free(xidp);
+ }
+ else
+ {
+ try
{
- try
- {
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid);
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_fid]--;
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+ rd._enq_cnt_list[enq_fid]--;
}
- ::free(xidp);
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
}
- break;
- case RHM_JDAT_TXC_MAGIC:
+ }
+ break;
+ case RHM_JDAT_TXA_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ txn_rec ar;
+ while (!done)
{
- if (!check_owi(fid, h, rd, read_pos))
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
return false;
- txn_rec cr;
- while (!done)
- {
- done = cr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ }
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
}
- // Delete this txn from tmap, process records into emap
- cr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ catch(const jexception& e)
{
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
- {
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
- rd._enq_cnt_list[fid]--;
- }
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
- ::free(xidp);
+ if (itr->_enq_flag)
+ rd._enq_cnt_list[itr->_fid]--;
}
- break;
- case RHM_JDAT_EMPTY_MAGIC:
+ ::free(xidp);
+ }
+ break;
+ case RHM_JDAT_TXC_MAGIC:
+ {
+ if (!check_owi(fid, h, rd, read_pos))
+ return false;
+ txn_rec cr;
+ while (!done)
{
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+ done = cr.rcv_decode(h, ifsp, cum_size_read);
+ if (!jfile_cycle(fid, ifsp, rd, true))
+ return false;
}
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
+ }
+ }
+ ::free(xidp);
+ }
break;
- case 0:
+ case RHM_JDAT_EMPTY_MAGIC:
+ {
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+ }
+ break;
+ case 0:
+ rd._lfid = fid;
+ rd._eo = ifsp->tellg();
+ return false;
+ default:
+ // Is this the last file, if so, stop as this is the overwrite boundary.
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+ {
rd._lfid = fid;
- rd._eo = ifsp->tellg();
+ rd._eo = read_pos;
return false;
- default:
- // Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
- {
- rd._lfid = fid;
- rd._eo = read_pos;
- return false;
- }
- std::stringstream ss;
- ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
- "rcvr_get_next_record");
+ }
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+ ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+ "rcvr_get_next_record");
}
return true;
@@ -747,7 +747,7 @@
ss << " foffs=0x" << std::setw(8) << read_pos;
ss << " expected_fid=0x" << std::setw(4) << expected_fid;
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
- "check_owi");
+ "check_owi");
}
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
@@ -757,33 +757,33 @@
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
-//kpvdr TODO -- this list needs to be mutexed...???
+ //kpvdr TODO -- this list needs to be mutexed...???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
+ journal->_aio_wr_cmpl_dtok_list.end());
journal->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
-*/
- break;
- default:
- ;
- }
- }
- intrusive_ptr_release(dtokp);
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+ /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+ */
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
}
@@ -792,21 +792,21 @@
jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
{
-//kpvdr TODO -- can we get rid of the copy???
+ //kpvdr TODO -- can we get rid of the copy???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
+ journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
// cct call the recovery manager. / lazyload..
- }
- }
- intrusive_ptr_release( dtokp);
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
18 years, 4 months