rhmessaging commits: r4156 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-07-28 09:01:29 -0400 (Wed, 28 Jul 2010)
New Revision: 4156
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Remove redundant TODO's
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-07-27 21:00:08 UTC (rev 4155)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-07-28 13:01:29 UTC (rev 4156)
@@ -2002,12 +2002,10 @@
}
catch (DatabaseException e)
{
- //TODO
throw new RuntimeException(e);
}
catch (AMQStoreException e)
{
- //TODO
throw new RuntimeException(e);
}
@@ -2024,7 +2022,6 @@
}
catch (AMQStoreException e)
{
- //TODO
throw new RuntimeException(e);
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
@@ -2046,7 +2043,6 @@
}
catch (AMQStoreException e)
{
- //TODO
throw new RuntimeException(e);
}
}
@@ -2059,7 +2055,6 @@
}
catch (AMQStoreException e)
{
- // TODO
throw new RuntimeException(e);
}
}
@@ -2079,7 +2074,6 @@
}
catch (AMQStoreException e)
{
- //TODO
throw new RuntimeException(e);
}
finally
@@ -2099,7 +2093,6 @@
}
catch (AMQStoreException e)
{
- // TODO
throw new RuntimeException(e);
}
}
@@ -2119,7 +2112,6 @@
}
catch (DatabaseException e)
{
- //TODO
throw new RuntimeException(e);
}
_ctx.setPayload(_txn);
14 years, 5 months
rhmessaging commits: r4155 - mgmt/newdata/cumin/python/cumin/messaging.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-07-27 17:00:08 -0400 (Tue, 27 Jul 2010)
New Revision: 4155
Modified:
mgmt/newdata/cumin/python/cumin/messaging/broker.py
mgmt/newdata/cumin/python/cumin/messaging/broker.strings
Log:
Untested enabling of cluster and acl modules
Modified: mgmt/newdata/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/messaging/broker.py 2010-07-27 20:31:59 UTC (rev 4154)
+++ mgmt/newdata/cumin/python/cumin/messaging/broker.py 2010-07-27 21:00:08 UTC (rev 4155)
@@ -128,13 +128,14 @@
self.view.add_tab(ExchangeSelector(app, "exchanges", self.object))
self.view.add_tab(ConnectionSelector(app, "connections", self.object))
self.view.add_tab(BrokerLinkSelector(app, "brokerlinks", self.object))
+ self.view.add_tab(BrokerAccessControl(app, "accessControl", self.object))
+ self.view.add_tab(BrokerClustering(app, "clustering", self.object))
self.queue_add = QueueAdd(app, self)
self.exchange_add = ExchangeAdd(app, self)
self.brokerlink_add = BrokerLinkAdd(app, self)
self.move_messages = MoveMessages(app, self)
self.engroup = BrokerEngroup(app, self)
- #self.add_selection_task(app.messaging.BrokerEngroup)
def do_process(self, session):
super(BrokerFrame, self).do_process(session)
@@ -164,41 +165,24 @@
mode = ModuleNotEnabled(app, "notenabled")
self.add_mode(mode)
- self.__view = BrokerAccessControlView(app, "view", self.acl)
- self.add_mode(self.__view)
+ self.view = ObjectView(app, "view", self.acl)
+ self.add_mode(self.view)
class AclModuleAttribute(Attribute):
def get_default(self, session):
- broker = self.widget.vhost.get(session).broker
+ vhost = self.widget.vhost.get(session)
- for acl in Acl.selectBy(broker=broker):
- return acl
+ cls = self.widget.app.model.org_apache_qpid_acl.Acl
+ acl = cls.get_object(session.cursor, _brokerRef_id=vhost._brokerRef_id)
+ return acl
def do_process(self, session):
if self.acl.get(session):
- self.__view.show(session)
+ self.view.show(session)
def render_title(self, session):
return "Access Control"
-class AccessControlGeneralStatSet(StatSet):
- def __init__(self, app, name, object):
- super(AccessControlGeneralStatSet, self).__init__(app, name, object)
-
- self.attrs = ("aclDenyCount",)
-
-class BrokerAccessControlView(Widget):
- def __init__(self, app, name, acl):
- super(BrokerAccessControlView, self).__init__(app, name)
-
- self.acl = acl
-
- self.props = CuminProperties(app, "props", self.acl)
- self.add_child(self.props)
-
- self.stats = AccessControlGeneralStatSet(app, "stats", self.acl)
- self.add_child(self.stats)
-
class BrokerClustering(ModeSet):
def __init__(self, app, name, vhost):
super(BrokerClustering, self).__init__(app, name)
@@ -211,15 +195,16 @@
self.notenabled = ModuleNotEnabled(app, "notenabled")
self.add_mode(self.notenabled)
- self.view = BrokerClusteringView(app, "view", self.cluster)
+ self.view = ObjectView(app, "view", self.cluster)
self.add_mode(self.view)
class ClusteringModuleAttribute(Attribute):
def get_default(self, session):
- broker = self.widget.vhost.get(session).broker
+ vhost = self.widget.vhost.get(session)
- for cluster in Cluster.selectBy(broker=broker):
- return cluster
+ cls = self.widget.app.model.org_apache_qpid_cluster.Cluster
+ cluster = cls.get_object(session.cursor, _brokerRef_id=vhost._brokerRef_id)
+ return cluster
def do_process(self, session):
if self.cluster.get(session):
@@ -228,22 +213,6 @@
def render_title(self, session):
return "Clustering"
-class ClusterGeneralStatSet(StatSet):
- def __init__(self, app, name, object):
- super(ClusterGeneralStatSet, self).__init__(app, name, object)
-
- self.attrs = ()
-
-class BrokerClusteringView(Widget):
- def __init__(self, app, name, cluster):
- super(BrokerClusteringView, self).__init__(app, name)
-
- props = CuminProperties(app, "props", cluster)
- self.add_child(props)
-
- stats = ClusterGeneralStatSet(app, "stats", cluster)
- self.add_child(stats)
-
class BrokerBrowser(Widget):
def __init__(self, app, name):
super(BrokerBrowser, self).__init__(app, name)
Modified: mgmt/newdata/cumin/python/cumin/messaging/broker.strings
===================================================================
--- mgmt/newdata/cumin/python/cumin/messaging/broker.strings 2010-07-27 20:31:59 UTC (rev 4154)
+++ mgmt/newdata/cumin/python/cumin/messaging/broker.strings 2010-07-27 21:00:08 UTC (rev 4155)
@@ -106,40 +106,6 @@
font-style: italic;
}
-[BrokerAccessControlView.html]
-<table class="twocol">
- <tbody>
- <tr>
- <td>
- <h2>Properties</h2>
- {props}
-
- <h2>Statistics</h2>
- {stats}
- </td>
- <td>
- </td>
- </tr>
- </tbody>
-</table>
-
-[BrokerClusteringView.html]
-<table class="twocol">
- <tbody>
- <tr>
- <td>
- <h2>Properties</h2>
- {props}
-
- <h2>Statistics</h2>
- {stats}
- </td>
- <td>
- </td>
- </tr>
- </tbody>
-</table>
-
[BrokerSetEngroupForm.css]
div.content ul {
list-style: none;
14 years, 5 months
rhmessaging commits: r4154 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-07-27 16:31:59 -0400 (Tue, 27 Jul 2010)
New Revision: 4154
Modified:
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
* Add delete policies based on new agent state
* Change all agent callbacks into queued agent updates; this fixes
some data errors that arose from ordering problems
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/model.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,6 +1,5 @@
from rosemary.model import *
-from update import *
from util import *
log = logging.getLogger("mint.model")
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/session.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,4 +1,4 @@
-from model import *
+from update import *
from util import *
from qmf.console import Console, Session
@@ -73,36 +73,27 @@
def newAgent(self, qmf_agent):
self.model.print_event(3, "Creating %s", qmf_agent)
- MintAgent(self.model, qmf_agent)
+ up = AgentUpdate(self.model, qmf_agent)
+ self.model.app.update_thread.enqueue(up)
def delAgent(self, qmf_agent):
self.model.print_event(3, "Deleting %s", qmf_agent)
- try:
- agent = self.model.get_agent(qmf_agent)
- except KeyError:
- return
-
- agent.delete()
-
if not self.model.app.update_thread.isAlive():
return
- up = AgentDelete(self.model, agent)
+ up = AgentDelete(self.model, qmf_agent)
self.model.app.update_thread.enqueue(up)
def heartbeat(self, qmf_agent, timestamp):
message = "Heartbeat from %s at %s"
self.model.print_event(5, message, qmf_agent, timestamp)
- timestamp = timestamp / 1000000000
-
- try:
- agent = self.model.get_agent(qmf_agent)
- except KeyError:
+ if not self.model.app.update_thread.isAlive():
return
- agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+ up = AgentUpdate(self.model, qmf_agent)
+ self.model.app.update_thread.enqueue(up)
def newPackage(self, name):
self.model.print_event(2, "New package %s", name)
@@ -110,22 +101,18 @@
def newClass(self, kind, classKey):
self.model.print_event(2, "New class %s", classKey)
- def objectProps(self, broker, obj):
- agent = self.model.get_agent(obj.getAgent())
-
+ def objectProps(self, broker, qmf_object):
if not self.model.app.update_thread.isAlive():
return
- up = ObjectUpdate(self.model, agent, obj)
+ up = ObjectUpdate(self.model, qmf_object)
self.model.app.update_thread.enqueue(up)
- def objectStats(self, broker, obj):
- agent = self.model.get_agent(obj.getAgent())
-
+ def objectStats(self, broker, qmf_object):
if not self.model.app.update_thread.isAlive():
return
- up = ObjectUpdate(self.model, agent, obj)
+ up = ObjectUpdate(self.model, qmf_object)
self.model.app.update_thread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/update.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -5,6 +5,8 @@
from psycopg2 import IntegrityError, TimestampFromTicks
from psycopg2.extensions import cursor as Cursor
from rosemary.model import *
+
+from model import *
from util import *
log = logging.getLogger("mint.update")
@@ -156,11 +158,11 @@
thread.stats.errors += 1
+ #print_exc()
+
if thread.halt_on_error:
raise
- #print_exc()
-
def do_process(self, cursor, stats):
raise Exception("Not implemented")
@@ -168,13 +170,18 @@
return self.__class__.__name__
class ObjectUpdate(Update):
- def __init__(self, model, agent, object):
+ def __init__(self, model, qmf_object):
super(ObjectUpdate, self).__init__(model)
- self.agent = agent
- self.object = object
+ self.qmf_object = qmf_object
+ self.agent = None
def do_process(self, cursor, stats):
+ try:
+ self.agent = self.model.get_agent(self.qmf_object.getAgent())
+ except KeyError:
+ raise UpdateDropped()
+
cls = self.get_class()
obj_id = self.get_object_id()
obj = self.agent.get_object_by_id(obj_id)
@@ -185,14 +192,17 @@
except RosemaryNotFound:
pass
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
if obj:
if delete_time != 0:
self.delete_object(cursor, stats, obj)
return
- if not self.object.getProperties() and self.object.getStatistics():
+ properties = self.qmf_object.getProperties()
+ statistics = self.qmf_object.getStatistics()
+
+ if not properties and statistics:
# Just stats; do we want it?
# if stats.enqueued - stats.dequeued > 500:
@@ -210,7 +220,7 @@
self.update_object(cursor, stats, obj)
else:
- if not self.object.getProperties():
+ if not self.qmf_object.getProperties():
raise UpdateDropped()
if delete_time != 0:
@@ -223,7 +233,7 @@
self.agent.add_object(obj)
def get_class(self):
- class_key = self.object.getClassKey()
+ class_key = self.qmf_object.getClassKey()
name = class_key.getPackageName()
try:
@@ -241,17 +251,17 @@
return cls
def get_object_id(self):
- return self.object.getObjectId().objectName
+ return self.qmf_object.getObjectId().objectName
def create_object(self, cursor, stats, cls):
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
create_time = datetime.fromtimestamp(create_time / 1000000000)
update_time = datetime.fromtimestamp(update_time / 1000000000)
obj = cls.create_object(cursor)
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = self.get_object_id()
- obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_session_id = str(self.qmf_object.getObjectId().getSequence())
obj._qmf_create_time = create_time
obj._qmf_update_time = update_time
@@ -294,7 +304,7 @@
return obj
def update_object(self, cursor, stats, obj):
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
update_time = datetime.fromtimestamp(update_time / 1000000000)
obj._qmf_update_time = update_time
@@ -345,7 +355,7 @@
def process_properties(self, obj, columns, cursor):
cls = obj._class
- for prop, value in self.object.getProperties():
+ for prop, value in self.qmf_object.getProperties():
try:
if prop.type == 10:
col, nvalue = self.process_reference \
@@ -409,7 +419,7 @@
return col, value
def process_statistics(self, obj, update_columns, insert_columns):
- for stat, value in self.object.getStatistics():
+ for stat, value in self.qmf_object.getStatistics():
try:
col = obj._class._statistics_by_name[stat.name].sql_column
except KeyError:
@@ -452,43 +462,71 @@
log.error("Qmf properties:")
- for item in sorted(self.object.getProperties()):
+ for item in sorted(self.qmf_object.getProperties()):
log.error(" %-34s %r", *item)
log.error("Qmf statistics:")
- for item in sorted(self.object.getStatistics()):
+ for item in sorted(self.qmf_object.getStatistics()):
log.error(" %-34s %r", *item)
raise
def __repr__(self):
name = self.__class__.__name__
- cls = self.object.getClassKey().getClassName()
- id = self.object.getObjectId().objectName
+ cls = self.qmf_object.getClassKey().getClassName()
+ id = self.qmf_object.getObjectId().objectName
return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-class AgentDelete(Update):
- def __init__(self, model, agent):
- super(AgentDelete, self).__init__(model)
+class AgentUpdate(Update):
+ def __init__(self, model, qmf_agent):
+ super(AgentUpdate, self).__init__(model)
- self.agent = agent
+ self.qmf_agent = qmf_agent
def do_process(self, cursor, stats):
- id = self.agent.id
+ try:
+ agent = self.model.get_agent(self.qmf_agent)
+ except KeyError:
+ agent_id = self.qmf_agent.getAgentBank()
+ self.delete_agent_objects(cursor, stats, agent_id)
+
+ agent = MintAgent(self.model, self.qmf_agent)
+
+ #timestamp = timestamp / 1000000000
+ #agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+
+ agent.last_heartbeat = datetime.now()
+
+ stats.updated += 1
+
+ # XXX Add periodic update of update_time
+
+ def delete_agent_objects(self, cursor, stats, agent_id):
for pkg in self.model._packages:
for cls in pkg._classes:
if cls._storage == "none":
continue
- for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ for obj in cls.get_selection(cursor, _qmf_agent_id=agent_id):
obj.delete(cursor)
self.model.print_event(3, "Deleted %s", obj)
stats.deleted += 1
+class AgentDelete(AgentUpdate):
+ def do_process(self, cursor, stats):
+ try:
+ agent = self.model.get_agent(qmf_agent)
+ except KeyError:
+ raise UpdateDropped()
+
+ agent.delete()
+
+ self.delete_agent_objects(cursor, stats, agent)
+
class UpdateDropped(Exception):
pass
14 years, 5 months
rhmessaging commits: r4153 - mgmt/newdata/wooly/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-07-27 15:11:04 -0400 (Tue, 27 Jul 2010)
New Revision: 4153
Modified:
mgmt/newdata/wooly/python/wooly/datatable.py
Log:
Get data at the start of the render pass so defered updates can work
Modified: mgmt/newdata/wooly/python/wooly/datatable.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/datatable.py 2010-07-27 18:22:43 UTC (rev 4152)
+++ mgmt/newdata/wooly/python/wooly/datatable.py 2010-07-27 19:11:04 UTC (rev 4153)
@@ -117,9 +117,7 @@
values = self.get_data_values(session)
return self.adapter.get_count(values)
- def do_process(self, session):
- super(DataTable, self).do_process(session)
-
+ def do_render(self, session):
start = time.time()
data = self.get_data(session)
@@ -130,6 +128,8 @@
self.summary.set(session, (len(data), seconds))
self.count.set(session, self.get_count(session))
+ return super(DataTable, self).do_render(session)
+
def render_font_size(self, session):
return "%.1fem" % self.header.font.get(session)
14 years, 5 months
rhmessaging commits: r4152 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-07-27 14:22:43 -0400 (Tue, 27 Jul 2010)
New Revision: 4152
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/TxnCtxt.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Tidied up definition of error constants from r.4148 that were in jcntl; I moved them to jerrno instead where they belong.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -294,7 +294,7 @@
}
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
- if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT) {
+ if (get_wr_events(&_aio_cmpl_timeout) == journal::jerrno::AIO_TIMEOUT) {
std::stringstream ss;
ss << "read_data_record() returned " << mrg::journal::iores_str(res);
ss << "; timed out waiting for page to be processed.";
Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -100,7 +100,7 @@
void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
if (!jc || jc->is_txn_synced(getXid()))
return;
- if (jc->get_wr_events(timeout) == journal::jcntl::AIO_TIMEOUT && timeout)
+ if (jc->get_wr_events(timeout) == journal::jerrno::AIO_TIMEOUT && timeout)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()"));
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -71,8 +71,6 @@
_final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
return true;
}
-int32_t jcntl::AIO_TIMEOUT = int32_t(-1);
-int32_t jcntl::THREAD_BLOCKED = int32_t(-2);
// Functions
@@ -362,7 +360,7 @@
{
stlock t(_wr_mutex);
if (!t.locked())
- return THREAD_BLOCKED;
+ return jerrno::LOCK_TAKEN;
return _wmgr.get_events(pmgr::UNUSED, timeout);
}
@@ -446,7 +444,7 @@
fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
while (fcntlp->wr_fhdr_aio_outstanding())
{
- if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
+ if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
}
}
@@ -502,7 +500,7 @@
{
while (_wmgr.get_aio_evt_rem())
{
- if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
+ if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
}
}
@@ -515,7 +513,7 @@
{
while (_wmgr.curr_pg_blocked())
{
- if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
{
std::ostringstream oss;
oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
@@ -529,7 +527,7 @@
{
while (_wmgr.curr_file_blocked())
{
- if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
{
std::ostringstream oss;
oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -149,8 +149,6 @@
public:
static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
- static int32_t AIO_TIMEOUT;
- static int32_t THREAD_BLOCKED;
/**
* \brief Journal constructor.
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -128,7 +128,11 @@
const u_int32_t jerrno::JERR_JINF_JDATEMPTY = 0x0c03;
const u_int32_t jerrno::JERR_JINF_TOOMANYFILES = 0x0c04;
+// Negative returns for some functions
+const int32_t jerrno::AIO_TIMEOUT = -1;
+const int32_t jerrno::LOCK_TAKEN = -2;
+
// static initialization fn
bool
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -146,6 +146,9 @@
static const u_int32_t JERR_JINF_JDATEMPTY; ///< Journal data files empty
static const u_int32_t JERR_JINF_TOOMANYFILES; ///< Too many journal data files
+ // Negative returns for some functions
+ static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return
+ static const int32_t LOCK_TAKEN; ///< Attempted to take lock, but it was taken by another thread
/**
* \brief Method to access error message from known error number.
*/
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -245,7 +245,7 @@
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
}
if (ret == 0 && timeout)
- return _jc->AIO_TIMEOUT;
+ return jerrno::AIO_TIMEOUT;
std::vector<u_int16_t> pil;
pil.reserve(ret);
@@ -326,7 +326,7 @@
// Wait for any outstanding AIO read operations to complete before synchronizing
while (_aio_evt_rem)
{
- if (get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT) // timed out, nothing returned
+ if (get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT) // timed out, nothing returned
{
throw jexception(jerrno::JERR__TIMEOUT,
"Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
@@ -347,7 +347,7 @@
bool timed_out = false;
while (!_rrfc.is_valid() && !timed_out)
{
- timed_out = get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT;
+ timed_out = get_events(AIO_COMPLETE, timeout) == jerrno::AIO_TIMEOUT;
if (timed_out && throw_on_timeout)
throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-26 21:37:19 UTC (rev 4151)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-27 18:22:43 UTC (rev 4152)
@@ -677,7 +677,7 @@
}
if (ret == 0 && timeout)
- return _jc->AIO_TIMEOUT;
+ return jerrno::AIO_TIMEOUT;
int32_t tot_data_toks = 0;
for (int i=0; i<ret; i++) // Index of returned AIOs
14 years, 5 months
rhmessaging commits: r4151 - in mgmt/newdata: cumin/python/cumin and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-07-26 17:37:19 -0400 (Mon, 26 Jul 2010)
New Revision: 4151
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/cumin/python/cumin/config.py
mgmt/newdata/mint/python/mint/session.py
Log:
Add configuration for binding to a defined set of qmf packages
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-07-26 20:34:27 UTC (rev 4150)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-07-26 21:37:19 UTC (rev 4151)
@@ -25,12 +25,26 @@
model_dir = os.path.join(config.home, "model")
mint = Mint(model_dir, opts.broker, opts.database)
-
mint.print_event_level = opts.print_events
mint.check()
mint.init()
+ if values.data.packages:
+ packages = list()
+
+ for name in values.data.packages.split(","):
+ name = name.strip()
+
+ try:
+ pkg = mint.model._packages_by_name[name]
+ except KeyError:
+ print "No package found for '%s'" % name
+
+ packages.append(pkg)
+
+ mint.session.qmf_packages = packages
+
if opts.init_only:
return
Modified: mgmt/newdata/cumin/python/cumin/config.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/config.py 2010-07-26 20:34:27 UTC (rev 4150)
+++ mgmt/newdata/cumin/python/cumin/config.py 2010-07-26 21:37:19 UTC (rev 4151)
@@ -33,6 +33,8 @@
data = CuminConfigSection(self, "data")
data.log_file.default = os.path.join(self.home, "log", "data.log")
+ param = ConfigParameter(data, "packages", str)
+
param = ConfigParameter(data, "expire-frequency", int)
param.default = 600 # 10 minutes
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-07-26 20:34:27 UTC (rev 4150)
+++ mgmt/newdata/mint/python/mint/session.py 2010-07-26 21:37:19 UTC (rev 4151)
@@ -12,6 +12,7 @@
self.qmf_session = None
self.qmf_brokers = list()
+ self.qmf_packages = self.app.model._packages
def add_broker(self, uri):
log.info("Adding QMF broker at %s", uri)
@@ -36,8 +37,12 @@
manageConnections=True,
rcvObjects=self.app.update_enabled,
rcvEvents=False,
- rcvHeartbeats=True)
+ rcvHeartbeats=True,
+ userBindings=True)
+ for pkg in self.app.model._packages:
+ self.qmf_session.bindPackage(pkg._name)
+
self.add_broker(self.broker_uri)
def stop(self):
14 years, 5 months
rhmessaging commits: r4150 - in mgmt/newdata: cumin/python/cumin/grid and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-07-26 16:34:27 -0400 (Mon, 26 Jul 2010)
New Revision: 4150
Modified:
mgmt/newdata/cumin/python/cumin/grid/job.py
mgmt/newdata/cumin/python/cumin/grid/submission.py
mgmt/newdata/cumin/python/cumin/main.py
mgmt/newdata/cumin/python/cumin/main.strings
mgmt/newdata/cumin/python/cumin/messaging/binding.py
mgmt/newdata/cumin/python/cumin/messaging/broker.py
mgmt/newdata/cumin/python/cumin/messaging/connection.py
mgmt/newdata/cumin/python/cumin/objectselector.py
mgmt/newdata/cumin/python/cumin/sqladapter.py
mgmt/newdata/cumin/python/cumin/widgets.py
mgmt/newdata/wooly/python/wooly/datatable.py
Log:
* For the sake of simpler queries that are also faster, use message
depth instead of recent enqueues for the queue top table
* Use existing sort facilities on objecttable instead of custom ones
on TopTables
* Fix an object-initialization ordering problem that prevented table
columns from picking up formats; generally try to make this less
confusing
* Correct some naming in objecttable
* Only set the default sort if it hasn't already been set
* In DataTable, apply field formats if prsesent; format floats with
"%0.02f" by default
Modified: mgmt/newdata/cumin/python/cumin/grid/job.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/job.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/grid/job.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -130,9 +130,11 @@
class JobSelector(ObjectSelector):
def __init__(self, app, name, submission):
cls = app.model.com_redhat_cumin_grid.JobSummary
- adapter = JobSummariesAdapter(app, cls)
- super(JobSelector, self).__init__(app, name, cls, adapter)
+ super(JobSelector, self).__init__(app, name, cls)
+
+ self.adapter = JobSummariesAdapter(app, cls)
+
self.submission = submission
frame = "main.grid.pool.submission.job"
self.job_id_col = self.JobIdColumn(app, "job", cls.GlobalJobId, cls.JobId, frame)
@@ -143,7 +145,7 @@
self.add_attribute_column(cls.Cmd)
- self.job_id_column = ObjectAttributeColumn(app, cls.JobId.name, cls.JobId)
+ self.job_id_column = ObjectTableColumn(app, cls.JobId.name, cls.JobId)
self.job_id_column.visible = False
self.add_column(self.job_id_column)
@@ -183,7 +185,7 @@
submission = self.parent.submission.get(session)
return frame.get_href(session, submission._id, job_id)
- class Status(ObjectAttributeColumn):
+ class Status(ObjectTableColumn):
def render_cell_content(self, session, record):
status = self.field.get_content(session, record)
return JobStatusInfo.get_status_string(status)
Modified: mgmt/newdata/cumin/python/cumin/grid/submission.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/submission.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/grid/submission.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -56,10 +56,11 @@
class SubmissionSelector(ObjectSelector):
def __init__(self, app, name):
cls = app.model.com_redhat_grid.Submission
- data = SubmissionData(app)
- super(SubmissionSelector, self).__init__(app, name, cls, data)
+ super(SubmissionSelector, self).__init__(app, name, cls)
+ self.adapter = SubmissionData(app)
+
self.add_attribute_column(cls.Idle)
self.add_attribute_column(cls.Running)
self.add_attribute_column(cls.Completed)
@@ -79,7 +80,7 @@
self.insert_column(1, col)
attr = self.cls.Owner
- col = ObjectAttributeColumn(app, attr.name, attr)
+ col = ObjectTableColumn(app, attr.name, attr)
self.insert_column(2, col)
class SubmissionAdd(ObjectTask):
Modified: mgmt/newdata/cumin/python/cumin/main.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/main.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/main.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -280,44 +280,41 @@
class TopQueueTable(TopTable):
def __init__(self, app, name):
cls = app.model.org_apache_qpid_broker.Queue
- adapter = TopQueueAdapter(app, cls)
+ super(TopQueueTable, self).__init__(app, name, cls)
- super(TopQueueTable, self).__init__(app, name, adapter)
-
- col = self.Name(app, "name")
+ frame = "main.messaging.broker.queue"
+ col = self.NameColumn(app, "name", cls.name, cls._id, frame)
self.add_column(col)
- col = self.MsgEnqueuesColumn(app, cls.msgTotalEnqueues.name,
- cls.msgTotalEnqueues)
- self.add_column(col)
+ col = self.add_attribute_column(cls.msgDepth)
- class MsgEnqueuesColumn(DataTableColumn):
- def render_header_content(self, session):
- return "Recent Enqueues / sec"
+ self.sort.default = col.name
- def render_cell_content(self, session, record):
- return "%.1f" % float(record[1])
+ def init(self):
+ super(TopQueueTable, self).init()
- def render_text_align(self, session):
- # type is str: "count64"
- return "right"
+ self.adapter.vhost_id_field = ObjectSqlField \
+ (self.adapter, self.cls.vhostRef)
- class Name(LinkColumn):
- def render_header_content(self, session):
- return "Name"
+ SqlComparisonFilter(self.adapter.query,
+ self.cls.msgDepth.sql_column,
+ "null",
+ "is not")
+ class NameColumn(ObjectLinkColumn):
def render_cell_href(self, session, record):
branch = session.branch()
- self.page.main.messaging.broker.id.set(branch, record[2])
- self.page.main.messaging.broker.queue.id.set(branch, record[3])
- self.page.main.messaging.broker.queue.view.show(branch)
+ frame = self.page.main.messaging.broker
+ adapter = self.table.adapter
+
+ frame.id.set(branch, record[adapter.vhost_id_field.index])
+ frame.queue.id.set(branch, record[adapter.id_field.index])
+ frame.queue.view.show(branch)
+
return branch.marshal()
- def render_cell_content(self, session, record):
- return record[0]
-
-class TopSystemTable(TopObjectTable):
+class TopSystemTable(TopTable):
def __init__(self, app, name):
cls = app.model.com_redhat_sesame.Sysimage
@@ -327,15 +324,19 @@
col = ObjectLinkColumn(app, "name", cls.nodeName, cls._id, frame)
self.add_column(col)
- attr = cls.loadAverage1Min
- col = TopObjectAttributeColumn(self.app, attr.name, attr)
- self.add_column(col)
- self.sort_col = attr.name
+ col = self.add_attribute_column(cls.loadAverage1Min)
- self.header = TopTableHeader(app, "header")
- self.replace_child(self.header)
+ self.sort.default = col.name
-class TopSubmissionTable(TopObjectTable):
+ def init(self):
+ super(TopSystemTable, self).init()
+
+ SqlComparisonFilter(self.adapter.query,
+ self.cls.loadAverage1Min.sql_column,
+ "null",
+ "is not")
+
+class TopSubmissionTable(TopTable):
def __init__(self, app, name):
cls = app.model.com_redhat_grid.Submission
@@ -347,8 +348,9 @@
col = self.DurationColumn(app, cls._qmf_create_time.name,
cls._qmf_create_time)
self.add_column(col)
- self.sort_col = cls._qmf_create_time.name
+ self.sort.default = col.name
+
def init(self):
super(TopSubmissionTable, self).init()
@@ -370,7 +372,7 @@
self.page.main.grid.pool.submission.view.show(session)
return branch.marshal()
- class DurationColumn(TopObjectAttributeColumn):
+ class DurationColumn(TopTableColumn):
def render_header_content(self, session):
return "Duration"
Modified: mgmt/newdata/cumin/python/cumin/main.strings
===================================================================
--- mgmt/newdata/cumin/python/cumin/main.strings 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/main.strings 2010-07-26 20:34:27 UTC (rev 4150)
@@ -24,7 +24,7 @@
<tr>
<td>
<div class="fullpageable">
- <h2><img src="resource?name=queue-20.png"/> Busiest Message Queues</h2>
+ <h2><img src="resource?name=queue-20.png"/> Deepest Message Queues</h2>
<div class="iblock">{queues}</div>
</div>
Modified: mgmt/newdata/cumin/python/cumin/messaging/binding.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/messaging/binding.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/messaging/binding.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -87,9 +87,9 @@
exchange = app.model.org_apache_qpid_broker.Exchange
queue = app.model.org_apache_qpid_broker.Queue
- data = BindingData(app)
+ super(BindingSelector, self).__init__(app, name, binding)
- super(BindingSelector, self).__init__(app, name, binding, data)
+ self.adapter = BindingData(app)
frame = "main.messaging.broker.binding"
col = ObjectLinkColumn \
Modified: mgmt/newdata/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/messaging/broker.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/messaging/broker.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -62,9 +62,9 @@
system = app.model.org_apache_qpid_broker.System
cluster = app.model.org_apache_qpid_cluster.Cluster
- data = BrokerData(app)
+ super(BrokerSelector, self).__init__(app, name, broker)
- super(BrokerSelector, self).__init__(app, name, broker, data)
+ self.adapter = BrokerData(app)
self.group = SessionAttribute(self, "group")
Modified: mgmt/newdata/cumin/python/cumin/messaging/connection.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/messaging/connection.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/messaging/connection.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -99,7 +99,7 @@
args = (item.remoteProcessName, item.remotePid)
return "%s (%i)" % args
-class ConnectionProcessColumn(ObjectAttributeColumn):
+class ConnectionProcessColumn(ObjectTableColumn):
def __init__(self, app, name, attr, pid_attr):
super(ConnectionProcessColumn, self).__init__(app, name, attr)
Modified: mgmt/newdata/cumin/python/cumin/objectselector.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectselector.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/objectselector.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -12,44 +12,43 @@
strings = StringCatalog(__file__)
class ObjectTable(DataTable):
- def __init__(self, app, name, cls, adapter=None):
+ def __init__(self, app, name, cls):
+ super(ObjectTable, self).__init__(app, name)
+
assert isinstance(cls, RosemaryClass), cls
- if not adapter:
- adapter = ObjectSqlAdapter(app, cls)
-
- assert isinstance(adapter, DataAdapter), adapter
-
- super(ObjectTable, self).__init__(app, name, adapter)
-
self.cls = cls
self.update_enabled = True
-
self.ascending.default = True
# (RosemaryAttribute this, RosemaryAttribute that, Attribute object)
self.filter_specs = list()
def init(self):
+ if not self.adapter:
+ self.adapter = ObjectSqlAdapter(self.app, self.cls)
+
super(ObjectTable, self).init()
- assert self.cls, self
- assert self.adapter, self
- #assert self.adapter.id_field, self
+ for this, that, fobj in self.filter_specs:
+ self.adapter.add_value_filter(this)
- for col in self.columns:
- if col.sortable:
- self.sort.default = col.name
+ if self.sort.default is None:
+ for col in self.columns:
+ if col.sortable:
+ self.sort.default = col.name
+ break
- break
-
def add_attribute_column(self, attr):
assert isinstance(attr, RosemaryAttribute), attr
- col = ObjectAttributeColumn(self.app, attr.name, attr)
+ col = ObjectTableColumn(self.app, attr.name, attr)
+
self.add_column(col)
+ return col
+
def add_filter(self, attribute, this, that=None):
if not that:
that = this
@@ -59,7 +58,6 @@
assert isinstance(that, RosemaryAttribute), that
self.filter_specs.append((this, that, attribute))
- self.adapter.add_value_filter(this)
def add_reference_filter(self, attribute, ref):
assert isinstance(ref, RosemaryReference), ref
@@ -74,9 +72,6 @@
for this, that, fobj in self.filter_specs:
obj = fobj.get(session)
-
- session.log("%r %r %r" % (this, obj, that))
-
values[this.name] = getattr(obj, that.name)
return values
@@ -84,9 +79,32 @@
def render_title(self, session):
return "%ss" % self.cls._title
+class ObjectTableColumn(DataTableColumn):
+ def __init__(self, app, name, attr):
+ super(ObjectTableColumn, self).__init__(app, name)
+
+ self.attr = attr
+
+ def init(self):
+ super(ObjectTableColumn, self).init()
+
+ assert self.table.adapter, self.table
+
+ try:
+ self.field = self.table.adapter.fields_by_attr[self.attr]
+ except KeyError:
+ # XXX do this instead:
+ #cls = self.table.adapter.default_field_cls
+ #self.field = cls(self.table.adapter, self.attr)
+
+ if isinstance(self.table.adapter, ObjectQmfAdapter):
+ self.field = ObjectQmfField(self.table.adapter, self.attr)
+ else:
+ self.field = ObjectSqlField(self.table.adapter, self.attr)
+
class ObjectSelector(ObjectTable, Form):
- def __init__(self, app, name, cls, adapter=None):
- super(ObjectSelector, self).__init__(app, name, cls, adapter)
+ def __init__(self, app, name, cls):
+ super(ObjectSelector, self).__init__(app, name, cls)
self.init_ids(app, cls)
@@ -125,24 +143,7 @@
(app, "id", cls._id, self.ids)
self.add_column(self.checkbox_column)
-class ObjectAttributeColumn(DataTableColumn):
- def __init__(self, app, name, attr):
- super(ObjectAttributeColumn, self).__init__(app, name, None)
-
- self.attr = attr
-
- def init(self):
- super(ObjectAttributeColumn, self).init()
-
- try:
- self.field = self.table.adapter.fields_by_attr[self.attr]
- except KeyError:
- if isinstance(self.table.adapter, ObjectQmfAdapter):
- self.field = ObjectQmfField(self.table.adapter, self.attr)
- else:
- self.field = ObjectSqlField(self.table.adapter, self.attr)
-
-class ObjectCheckboxColumn(ObjectAttributeColumn):
+class ObjectCheckboxColumn(ObjectTableColumn):
def __init__(self, app, name, attr, selection):
super(ObjectCheckboxColumn, self).__init__(app, name, attr)
@@ -175,7 +176,7 @@
return record[self.parent.parent.field.index] in checks \
and "checked=\"checked\"" or ""
-class ObjectLinkColumn(ObjectAttributeColumn, LinkColumn):
+class ObjectLinkColumn(ObjectTableColumn, LinkColumn):
def __init__(self, app, name, attr, id_attr, frame_path):
super(ObjectLinkColumn, self).__init__(app, name, attr)
Modified: mgmt/newdata/cumin/python/cumin/sqladapter.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/sqladapter.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/sqladapter.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -13,10 +13,6 @@
self.query = SqlQuery(self.table)
self.columns = list()
- def init(self):
- for field in self.fields:
- field.init()
-
def get_count(self, values):
# XXX urgh. I want session in here
Modified: mgmt/newdata/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/widgets.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/cumin/python/cumin/widgets.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -1542,35 +1542,11 @@
exchanges = cls.get_selection(session.cursor, _vhostRef_id=vhostid)
return self.base_get_items(session, exchanges)
-class TopTable(DataTable):
- def __init__(self, app, name, adapter):
- super(TopTable, self).__init__(app, name, adapter)
-
- self.header = TopTableHeader(app, "header")
- self.replace_child(self.header)
-
- self.footer = TopTableFooter(app, "footer")
- self.replace_child(self.footer)
-
- self.update_enabled = True
-
- def get_count(self, session):
- # avoid extra sql call since we don't show the record count
- return 0
-
- def get_data_options(self, session):
- options = SqlQueryOptions()
-
- options.limit = 5
- options.offset = 0
-
- return options
-
-class TopObjectTable(ObjectTable):
+class TopTable(ObjectTable):
def __init__(self, app, name, cls):
- super(TopObjectTable, self).__init__(app, name, cls)
+ super(TopTable, self).__init__(app, name, cls)
- col = ObjectAttributeColumn(app, cls._id.name, cls._id)
+ col = ObjectTableColumn(app, cls._id.name, cls._id)
col.visible = False
self.add_column(col)
@@ -1580,18 +1556,13 @@
self.footer = TopTableFooter(app, "footer")
self.replace_child(self.footer)
- self.sort_col = None
+ self.header.limit.default = 5
+ self.ascending.default = False
def get_count(self, session):
# avoid extra sql call since we don't show the record count
return 0
- def get_data_options(self, session):
- sort_col = self.sort_col or self.cls._id.name
- self.sort.set(session, sort_col)
- self.ascending.set(session, False)
- return super(TopObjectTable, self).get_data_options(session)
-
class TopTableHeader(TableHeader):
def __init__(self, app, name):
super(TopTableHeader, self).__init__(app, name)
@@ -1620,9 +1591,9 @@
def render_class(self, session):
return self.parent.name
-class TopObjectAttributeColumn(ObjectAttributeColumn):
- def render_class(self, session):
- return self.name
+class TopTableColumn(ObjectTableColumn):
+ def render_class(self, session):
+ return self.name
class TopTableFooter(Widget):
def render(self, session):
Modified: mgmt/newdata/wooly/python/wooly/datatable.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/datatable.py 2010-07-26 20:12:01 UTC (rev 4149)
+++ mgmt/newdata/wooly/python/wooly/datatable.py 2010-07-26 20:34:27 UTC (rev 4150)
@@ -26,20 +26,31 @@
self.adapter = adapter
self.name = name
self.type = type
+ self.format = None
self.index = len(self.adapter.fields)
self.adapter.fields.append(self)
self.adapter.fields_by_name[self.name] = self
def init(self):
- pass
+ if self.format is None and self.type is float:
+ self.format = "%0.02f"
def get_title(self, session):
pass
def get_content(self, session, record):
- return record[self.index]
+ value = record[self.index]
+ if self.format is not None:
+ value = self.format % value
+
+ return value
+
+ def __repr__(self):
+ args = (self.__class__.__name__, self.name, self.type.__name__)
+ return "%s(%s,%s)" % args
+
class DataAdapterOptions(object):
def __init__(self):
self.sort_field = None
@@ -50,10 +61,10 @@
self.attributes = dict()
class DataTable(Table):
- def __init__(self, app, name, adapter):
+ def __init__(self, app, name):
super(DataTable, self).__init__(app, name)
- self.adapter = adapter
+ self.adapter = None
self.data = Attribute(app, "data")
self.add_attribute(self.data)
@@ -71,12 +82,11 @@
self.replace_child(self.footer)
def init(self):
- assert self.adapter
+ super(DataTable, self).init()
+ assert isinstance(self.adapter, DataAdapter), self.adapter
self.adapter.init()
- super(DataTable, self).init()
-
def get_data(self, session):
values = self.get_data_values(session)
options = self.get_data_options(session)
@@ -145,10 +155,10 @@
return writer.to_string()
class DataTableColumn(TableColumn):
- def __init__(self, app, name, field):
+ def __init__(self, app, name):
super(DataTableColumn, self).__init__(app, name)
- self.field = field
+ self.field = None
def render_text_align(self, session):
if self.field.type in (long, int, float, complex):
14 years, 5 months
rhmessaging commits: r4149 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-07-26 16:12:01 -0400 (Mon, 26 Jul 2010)
New Revision: 4149
Modified:
store/trunk/cpp/tests/Makefile.am
Log:
Restored the persistent cluster tests which were accidentally removed in the last checkin.
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2010-07-26 19:48:20 UTC (rev 4148)
+++ store/trunk/cpp/tests/Makefile.am 2010-07-26 20:12:01 UTC (rev 4149)
@@ -29,7 +29,11 @@
TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
TMP_PYTHON_TEST_DIR=$(abs_srcdir)/python_tests.tmp
+if DO_CLUSTER_TESTS
+SUBDIRS = jrnl . cluster
+else
SUBDIRS = jrnl .
+endif
TESTS = \
SimpleTest \
14 years, 5 months
rhmessaging commits: r4148 - in store/trunk/cpp: lib/jrnl and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-07-26 15:48:20 -0400 (Mon, 26 Jul 2010)
New Revision: 4148
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/aio.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Performance and efficiency improvement - removed loops using usleep for blocking calls waiting on AIO getevents() and used built-in timeout instead.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -237,8 +237,8 @@
_emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
}
-#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_SLEEP_TIME_US 10 // 0.01 ms
+//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
+//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
// Throw exception for all errors.
bool
@@ -272,7 +272,6 @@
bool transient = false;
bool done = false;
bool rid_found = false;
- unsigned aio_sleep_cnt = 0;
oooRidList.clear();
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
@@ -295,13 +294,10 @@
}
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
- get_wr_events();
- usleep(AIO_SLEEP_TIME_US);
- } else {
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT) {
std::stringstream ss;
ss << "read_data_record() returned " << mrg::journal::iores_str(res);
- ss << "; exceeded maximum wait time";
+ ss << "; timed out waiting for page to be processed.";
throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
@@ -425,30 +421,6 @@
}
}
-#define MAX_INVALID_RETRYS 5 // This will yield a total wait time of 10 sec with 5 log messages at 2 sec intervals
-iores
-JournalImpl::read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff, size_t& xid_len,
- bool& transient, bool& external, mrg::journal::data_tok* const dtokp, bool ignore_pending_txns)
-{
- int retry_cnt = 0;
- iores res;
- do {
- res = jcntl::read_data_record(data_buff, tot_data_len, xid_buff, xid_len, transient, external, dtokp, ignore_pending_txns);
- if (res == mrg::journal::RHM_IORES_RCINVALID) {
- retry_cnt++;
- std::ostringstream oss;
- if (retry_cnt < MAX_INVALID_RETRYS) {
- oss << "Store read pipeline on queue " << _jid << " timed out waiting for journal header file read, retrying...";
- log(LOG_WARN, oss.str());
- } else {
- oss << "Store read pipeline on queue " << _jid << " timed out waiting for journal header file read, aborting read with RHM_IORES_RCINVALID";
- log(LOG_ERROR, oss.str());
- }
- }
- } while (res == mrg::journal::RHM_IORES_RCINVALID && retry_cnt < MAX_INVALID_RETRYS);
- return res;
-}
-
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
@@ -524,7 +496,7 @@
{
qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
- if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
+ if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-07-26 19:48:20 UTC (rev 4148)
@@ -185,10 +185,6 @@
void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
- mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
- size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
- bool ignore_pending_txns = false);
-
void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -1,7 +1,6 @@
#include "TxnCtxt.h"
#include <sstream>
-#include <unistd.h> // ::usleep()
#include "jrnl/jexception.hpp"
#include "StoreException.h"
@@ -76,42 +75,35 @@
TxnCtxt::~TxnCtxt() { if(txn) abort(); }
-#define MAX_SYNC_SLEEPS 100000 // tot: ~1 sec
-#define SYNC_SLEEP_TIME_US 10 // 0.01 ms
-
void TxnCtxt::sync() {
- bool allWritten = false;
- bool firstloop = true;
- long sleep_cnt = 0L;
- while (loggedtx && !allWritten) {
- if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
- if (!firstloop) {
- ::usleep(SYNC_SLEEP_TIME_US);
- sleep_cnt++;
- } // move this into the get events call aiolib..
- allWritten = true;
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+ if (loggedtx) {
+ try {
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+ jrnl_flush(static_cast<JournalImpl*>(*i));
+ if (preparedXidStorePtr)
+ jrnl_flush(preparedXidStorePtr);
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+ jrnl_sync(static_cast<JournalImpl*>(*i), &journal::jcntl::_aio_cmpl_timeout);
+ if (preparedXidStorePtr)
+ jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout);
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what());
}
- if (preparedXidStorePtr)
- sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
- firstloop = false;
}
}
-void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
+void TxnCtxt::jrnl_flush(JournalImpl* jc) {
+ if (jc && !(jc->is_txn_synced(getXid())))
+ jc->flush();
}
+void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
+ if (!jc || jc->is_txn_synced(getXid()))
+ return;
+ if (jc->get_wr_events(timeout) == journal::jcntl::AIO_TIMEOUT && timeout)
+ THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()"));
+}
+
void TxnCtxt::begin(DbEnv* env, bool sync) {
env->txn_begin(0, &txn, 0);
if (sync)
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.h 2010-07-26 19:48:20 UTC (rev 4148)
@@ -69,6 +69,8 @@
virtual void completeTxn(bool commit);
void commitTxn(JournalImpl* jc, bool commit);
+ void jrnl_flush(JournalImpl* jc);
+ void jrnl_sync(JournalImpl* jc, timespec* timeout);
public:
TxnCtxt(IdSequence* _loggedtx=NULL);
@@ -81,7 +83,6 @@
*@return if the data successfully synced.
*/
void sync();
- void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
void begin(DbEnv* env, bool sync = false);
void commit();
void abort();
Modified: store/trunk/cpp/lib/jrnl/aio.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/aio.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -68,7 +68,7 @@
return ::io_submit(ctx, nr, aios);
}
- static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* timeout)
+ static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* const timeout)
{
return ::io_getevents(ctx, min_nr, nr, events, timeout);
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -54,6 +54,27 @@
namespace journal
{
+#define AIO_CMPL_TIMEOUT_SEC 5
+#define AIO_CMPL_TIMEOUT_NSEC 0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC 15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC 0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+ _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+ _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+ _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+ _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+ return true;
+}
+int32_t jcntl::AIO_TIMEOUT = int32_t(-1);
+int32_t jcntl::THREAD_BLOCKED = int32_t(-2);
+
+
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
@@ -263,31 +284,21 @@
return _rmgr.discard(dtokp);
} */
-// These two combined make a wait time of approx. 2 sec.
-#define MAX_RCINVALID_CNT 20000 // tot: ~ 2 sec
-#define RCINVALID_SLEEP_TIME_US 100 // 0.1 ms
iores
jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
{
check_rstatus("read_data");
- unsigned cnt = 0;
- iores res;
- do
+ iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+ if (res == RHM_IORES_RCINVALID)
{
+ get_wr_events(0); // check for outstanding write events
+ iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+ if (sres != RHM_IORES_SUCCESS)
+ return sres;
+ _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
- if (res == RHM_IORES_RCINVALID)
- {
- get_wr_events(); // check for outstanding write events
- iores sres = _rmgr.synchronize();
- if (sres != RHM_IORES_SUCCESS)
- return sres;
- if (cnt > 0)
- ::usleep(RCINVALID_SLEEP_TIME_US);
- }
- cnt++;
}
- while (cnt < MAX_RCINVALID_CNT && res == RHM_IORES_RCINVALID);
return res;
}
@@ -346,19 +357,19 @@
return _wmgr.is_txn_synced(xid);
}
-u_int32_t
-jcntl::get_wr_events()
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
{
stlock t(_wr_mutex);
- if (t.locked())
- return _wmgr.get_events(pmgr::UNUSED);
- return 0;
+ if (!t.locked())
+ return THREAD_BLOCKED;
+ return _wmgr.get_events(pmgr::UNUSED, timeout);
}
-u_int32_t
-jcntl::get_rd_events()
+int32_t
+jcntl::get_rd_events(timespec* const timeout)
{
- return _rmgr.get_events();
+ return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
}
void
@@ -429,21 +440,14 @@
_rmgr.invalidate();
}
-#define MAX_AIO_CMPL_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_CMPL_SLEEP_US 10 // 0.01 ms
-
void
jcntl::fhdr_wr_sync(const u_int16_t lid)
{
- long cnt = 0;
fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
- get_wr_events();
while (fcntlp->wr_fhdr_aio_outstanding())
{
- if (++cnt > MAX_AIO_CMPL_SLEEPS)
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
- ::usleep(AIO_CMPL_SLEEP_US);
- get_wr_events();
}
}
@@ -496,36 +500,28 @@
void
jcntl::aio_cmpl_wait()
{
- u_int32_t cnt = 0;
while (_wmgr.get_aio_evt_rem())
{
- get_wr_events();
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
- ::usleep(AIO_CMPL_SLEEP_US);
}
}
bool
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
- // TODO: factor out the common while loops below into a common fn
- u_int32_t cnt = 0;
resout = res;
if (res == RHM_IORES_PAGE_AIOWAIT)
{
while (_wmgr.curr_pg_blocked())
{
- _wmgr.get_events(pmgr::UNUSED);
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
{
std::ostringstream oss;
- oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
- oss << _wmgr.status_str();
+ oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
this->log(LOG_CRITICAL, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
}
- ::usleep(AIO_CMPL_SLEEP_US);
}
return true;
}
@@ -533,16 +529,13 @@
{
while (_wmgr.curr_file_blocked())
{
- _wmgr.get_events(pmgr::UNUSED);
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
{
std::ostringstream oss;
- oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
- oss << _wmgr.status_str();
+ oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
this->log(LOG_CRITICAL, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
}
- ::usleep(AIO_CMPL_SLEEP_US);
}
_wrfc.wr_reset();
resout = RHM_IORES_SUCCESS;
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -147,6 +147,11 @@
smutex _wr_mutex; ///< Mutex for journal writes
public:
+ static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+ static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+ static int32_t AIO_TIMEOUT;
+ static int32_t THREAD_BLOCKED;
+
/**
* \brief Journal constructor.
*
@@ -527,7 +532,7 @@
* dequeue() operations, but if these operations cease, then this call needs to be made to
* force the processing of any outstanding AIO operations.
*/
- u_int32_t get_wr_events();
+ int32_t get_wr_events(timespec* const timeout);
/**
* \brief Forces a check for returned AIO read events.
@@ -536,7 +541,7 @@
* operations, but if these operations cease, then this call needs to be made to force the
* processing of any outstanding AIO operations.
*/
- u_int32_t get_rd_events();
+ int32_t get_rd_events(timespec* const timeout);
/**
* \brief Stop the journal from accepting any further requests to read or write data.
@@ -659,6 +664,9 @@
static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp);
protected:
+ static bool _init;
+ static bool init_statics();
+
/**
* \brief Check status of journal before allowing write operations.
*/
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -123,7 +123,7 @@
pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
virtual ~pmgr();
- virtual u_int32_t get_events(page_state state) = 0;
+ virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0;
inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
static const char* page_state_str(page_state ps);
inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -229,18 +229,23 @@
}
}
-u_int32_t
-rmgr::get_events(page_state state)
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
{
- int ret = 0;
- if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
+ int32_t ret;
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
{
- if (ret == -EINTR) // No events
+ if (ret == -EINTR) // Interrupted by signal
return 0;
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
}
+ if (ret == 0 && timeout)
+ return _jc->AIO_TIMEOUT;
std::vector<u_int16_t> pil;
pil.reserve(ret);
@@ -315,26 +320,16 @@
_rrfc.set_invalid();
}
-#define MAX_AIO_SLEEPS 1000 // 10 sec
-#define AIO_SLEEP_TIME 10000 // 10 ms
void
-rmgr::init_validation()
+rmgr::flush(timespec* timeout)
{
// Wait for any outstanding AIO read operations to complete before synchronizing
- int aio_sleep_cnt = 0;
while (_aio_evt_rem)
{
- get_events();
- if (_aio_evt_rem)
+ if (get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT) // timed out, nothing returned
{
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
- {
- get_events();
- usleep(AIO_SLEEP_TIME);
- }
- else
- throw jexception(jerrno::JERR__TIMEOUT,
- "Invalidate timed out waiting for outstanding read aio to return", "rmgr", "invalidate");
+ throw jexception(jerrno::JERR__TIMEOUT,
+ "Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
}
}
@@ -346,11 +341,24 @@
_pg_offset_dblks = 0;
}
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+ bool timed_out = false;
+ while (!_rrfc.is_valid() && !timed_out)
+ {
+ timed_out = get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT;
+ if (timed_out && throw_on_timeout)
+ throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
+ }
+ return _rrfc.is_valid();
+}
+
iores
rmgr::pre_read_check(data_tok* dtokp)
{
if (_aio_evt_rem)
- get_events();
+ get_events(AIO_COMPLETE, 0);
if (!_rrfc.is_valid())
return RHM_IORES_RCINVALID;
@@ -524,11 +532,13 @@
rmgr::aio_cycle()
{
// Perform validity checks
- if (_fhdr_rd_outstanding)
+ if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
return RHM_IORES_SUCCESS;
if (!_rrfc.is_valid())
{
- init_validation(); // flush outstanding read aio ops (if any), set all pages to UNUSED state, reset counters.
+ // Flush and reset all read states and pointers
+ flush(&jcntl::_aio_cmpl_timeout);
+
_jc->get_earliest_fid(); // determine initial file to read; calls _rrfc.set_findex() to set value
// If this file has not yet been written to, return RHM_IORES_EMPTY
if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -570,7 +580,7 @@
else if (num_compl == _cache_num_pages) // This condition exists after invalidation
res = init_aio_reads(0, _cache_num_pages);
if (outstanding)
- get_events();
+ get_events(AIO_COMPLETE, 0);
return res;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -78,10 +78,11 @@
iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
bool ignore_pending_txns);
- u_int32_t get_events(page_state state = AIO_COMPLETE);
+ int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
void recover_complete();
inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
void invalidate();
+ bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout = false);
/* TODO (if required)
const iores get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
@@ -91,7 +92,7 @@
private:
void clean();
- void init_validation();
+ void flush(timespec* timeout);
iores pre_read_check(data_tok* dtokp);
iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -647,7 +647,7 @@
_page_cb_arr[_pg_index]._state = IN_USE;
}
}
- get_events(UNUSED);
+ get_events(UNUSED, 0);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
return res;
@@ -662,18 +662,24 @@
return res;
}
-u_int32_t
-wmgr::get_events(page_state state)
+int32_t
+wmgr::get_events(page_state state, timespec* const timeout, bool flush)
{
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
int ret = 0;
- if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
{
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
}
- u_int32_t tot_data_toks = 0;
+ if (ret == 0 && timeout)
+ return _jc->AIO_TIMEOUT;
+
+ int32_t tot_data_toks = 0;
for (int i=0; i<ret; i++) // Index of returned AIOs
{
if (_aio_evt_rem == 0)
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -115,7 +115,7 @@
iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores flush();
- u_int32_t get_events(page_state state);
+ int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
bool is_txn_synced(const std::string& xid);
inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
inline bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/Makefile.am 2010-07-26 19:48:20 UTC (rev 4148)
@@ -29,11 +29,7 @@
TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
TMP_PYTHON_TEST_DIR=$(abs_srcdir)/python_tests.tmp
-if DO_CLUSTER_TESTS
-SUBDIRS = jrnl . cluster
-else
SUBDIRS = jrnl .
-endif
TESTS = \
SimpleTest \
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -547,7 +547,7 @@
{
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
{
- jc.get_wr_events();
+ jc.get_wr_events(0); // *** GEV2
usleep(AIO_SLEEP_TIME);
}
else
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -191,7 +191,7 @@
_tcrp->add_exception("Timeout waiting for RHM_IORES_ENQCAPTHRESH to clear.");
panic();
}
- else if (get_wr_events() == 0)
+ else if (get_wr_events(0) == 0) // *** GEV2
{
mrg::journal::slock sl(_wr_full_mutex);
_wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
@@ -273,7 +273,7 @@
xptr = 0;
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
- if (get_rd_events() == 0)
+ if (get_rd_events(0) == 0)
{
mrg::journal::slock sl(_rd_aio_mutex);
_rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms
14 years, 5 months
rhmessaging commits: r4147 - in store/trunk/java/bdbstore: src/test/java/org/apache/qpid/server/store/berkeleydb and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-07-26 11:58:14 -0400 (Mon, 26 Jul 2010)
New Revision: 4147
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile
Log:
Update BDB store to throw AMQStoreException instead of AMQException, fix compilation due to changes to main codebase. Applied patch from Andrew Kennedy
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackup.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -29,7 +29,6 @@
import org.apache.qpid.util.CommandLineParser;
import org.apache.qpid.util.FileUtils;
-import org.apache.qpid.util.PrettyPrintingUtils;
import java.io.*;
import java.util.LinkedList;
@@ -127,8 +126,7 @@
if (log.isInfoEnabled())
{
- log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: "
- + PrettyPrintingUtils.printArray(backedUpFiles));
+ log.info("BDBBackup Utility: Hot Backup Completed. Files backed up: " + backedUpFiles);
}
}
catch (Exception e)
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -34,7 +34,7 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
@@ -45,6 +45,7 @@
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -90,7 +91,7 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-@SuppressWarnings({"unchecked","deprecation"})
+@SuppressWarnings({"unchecked"})
public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -284,16 +285,14 @@
return configure(environmentPath, false);
}
-
/**
- *
* @param environmentPath location for the store to be created in/recovered from
* @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
* @return whether or not a new store environment was created
- * @throws AMQException
+ * @throws AMQStoreException
* @throws DatabaseException
*/
- protected boolean configure(File environmentPath, boolean readonly) throws AMQException, DatabaseException
+ protected boolean configure(File environmentPath, boolean readonly) throws AMQStoreException, DatabaseException
{
stateTransition(State.INITIAL, State.CONFIGURING);
@@ -318,14 +317,14 @@
*
* This is required if you do not want to perform recovery of the store data
*
- * @throws AMQException if the store is not in the correct state
+ * @throws AMQStoreException if the store is not in the correct state
*/
- public void start() throws AMQException
+ public void start() throws AMQStoreException
{
stateTransition(State.CONFIGURING, State.STARTED);
}
- private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQException
+ private boolean setupStore(File storePath, boolean readonly) throws DatabaseException, AMQStoreException
{
checkState(State.CONFIGURING);
@@ -364,8 +363,8 @@
continue;
}
}
- // Otherwise Check Versions
+ // Otherwise Check Versions
int version = Integer.parseInt(s.substring(versionIndex + 2));
if (version != _version)
@@ -377,22 +376,22 @@
}
}
- private synchronized void stateTransition(State requiredState, State newState) throws AMQException
+ private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
{
- throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ "; currently in state: " + _state);
}
_state = newState;
}
- private void checkState(State requiredState) throws AMQException
+ private void checkState(State requiredState) throws AMQStoreException
{
if (_state != requiredState)
{
- throw new AMQException("Unexpected state: " + _state + "; required state: " + requiredState);
+ throw new AMQStoreException("Unexpected state: " + _state + "; required state: " + requiredState);
}
}
@@ -539,7 +538,7 @@
}
- public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQStoreException
{
stateTransition(State.CONFIGURED, State.RECOVERING);
@@ -560,7 +559,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error recovering persistent state: " + e, e);
+ throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
@@ -601,7 +600,7 @@
}
- private void loadExchanges(ExchangeRecoveryHandler erh) throws AMQException, DatabaseException
+ private void loadExchanges(ExchangeRecoveryHandler erh) throws DatabaseException
{
Cursor cursor = null;
@@ -704,7 +703,7 @@
}
catch (DatabaseException e)
{
- _log.error("Database Error: " + e, e);
+ _log.error("Database Error: " + e.getMessage(), e);
throw e;
}
finally
@@ -717,7 +716,7 @@
}
private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
- throws DatabaseException, AMQException
+ throws DatabaseException
{
QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
@@ -758,7 +757,7 @@
}
catch (DatabaseException e)
{
- _log.error("Database Error: " + e, e);
+ _log.error("Database Error: " + e.getMessage(), e);
throw e;
}
finally
@@ -777,10 +776,9 @@
*
* @param messageId Identifies the message to remove.
*
- * @throws AMQException If the operation fails for any reason.
- * @throws DatabaseException
+ * @throws AMQInternalException If the operation fails for any reason.
*/
- public void removeMessage(Long messageId) throws AMQException
+ public void removeMessage(Long messageId) throws AMQStoreException
{
// _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
// + "): called");
@@ -808,7 +806,7 @@
{
tx.abort();
- throw new AMQException("Message metadata not found for message id " + messageId);
+ throw new AMQStoreException("Message metadata not found for message id " + messageId);
}
if (_log.isDebugEnabled())
@@ -851,7 +849,7 @@
cursor = null;
tx.abort();
- throw new AMQException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
+ throw new AMQStoreException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
}
if (_log.isDebugEnabled())
@@ -886,11 +884,11 @@
}
catch (DatabaseException e1)
{
- throw new AMQException("Error aborting transaction " + e1, e1);
+ throw new AMQStoreException("Error aborting transaction " + e1, e1);
}
}
- throw new AMQException("Error removing message with id " + messageId + " from database: " + e, e);
+ throw new AMQStoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
finally
{
@@ -902,21 +900,16 @@
}
catch (DatabaseException e)
{
- //TODO
- throw new RuntimeException(e);
+ throw new AMQStoreException("Error closing database connection: " + e.getMessage(), e);
}
}
}
}
/**
- * Makes the specified exchange persistent.
- *
- * @param exchange The exchange to persist.
- *
- * @throws AMQException If the operation fails for any reason.
+ * @see DurableConfigurationStore#createExchange(Exchange)
*/
- public void createExchange(Exchange exchange) throws AMQException
+ public void createExchange(Exchange exchange) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
@@ -937,20 +930,15 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing Exchange with name " + exchange.getName() + " to database: " + e.getMessage(), e);
}
}
}
/**
- * Removes the specified persistent exchange.
- * Internal method that is package scoped to allow testing.
- *
- * @param exchange The exchange to remove.
- *
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @see DurableConfigurationStore#removeExchange(Exchange)
*/
- public void removeExchange(Exchange exchange) throws AMQException
+ public void removeExchange(Exchange exchange) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
@@ -960,12 +948,12 @@
OperationStatus status = _exchangeDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQException("Exchange " + exchange.getName() + " not found");
+ throw new AMQStoreException("Exchange " + exchange.getName() + " not found");
}
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
+ throw new AMQStoreException("Error writing deleting with name " + exchange.getName() + " from database: " + e.getMessage(), e);
}
}
@@ -973,16 +961,9 @@
/**
- * Binds the specified queue to an exchange with a routing key.
- *
- * @param exchange The exchange to bind to.
- * @param routingKey The routing key to bind by.
- * @param queue The queue to bind.
- * @param args Additional parameters.
- *
- * @throws AMQException If the operation fails for any reason.
+ * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
*/
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
// _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey
// + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called");
@@ -1009,24 +990,17 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " to database: " + e.getMessage(), e);
}
}
}
/**
- * Unbinds the specified from an exchange under a particular routing key.
- *
- * @param exchange The exchange to unbind from.
- * @param routingKey The routing key to unbind.
- * @param queue The queue to unbind.
- * @param args Additonal parameters.
- *
- * @throws AMQException If the operation fails for any reason.
+ * @see DurableConfigurationStore#unbindQueue(Exchange, AMQShortString, AMQQueue, FieldTable)
*/
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
+ throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
@@ -1037,31 +1011,29 @@
OperationStatus status = _queueBindingsDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ throw new AMQStoreException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " not found");
}
}
catch (DatabaseException e)
{
- throw new AMQException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " from database: " + e, e);
+ throw new AMQStoreException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
+ + exchange.getName() + " from database: " + e.getMessage(), e);
}
}
- public void createQueue(AMQQueue queue) throws AMQException
+ /**
+ * @see DurableConfigurationStore#createQueue(AMQQueue)
+ */
+ public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
/**
- * Makes the specified queue persistent.
- *
- * @param queue The queue to store.
- * @param arguments
- *
- * @throws AMQException If the operation fails for any reason.
+ * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
*/
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1081,9 +1053,9 @@
*
* @param queueRecord Details of the queue to store.
*
- * @throws AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- protected void createQueue(QueueRecord queueRecord) throws AMQException
+ protected void createQueue(QueueRecord queueRecord) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
@@ -1101,8 +1073,8 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing AMQQueue with name " +
- queueRecord.getNameShortString().toString() + " to database: " + e, e);
+ throw new AMQStoreException("Error writing AMQQueue with name " + queueRecord.getNameShortString().asString()
+ + " to database: " + e.getMessage(), e);
}
}
}
@@ -1114,9 +1086,9 @@
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
- * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void updateQueue(final AMQQueue queue) throws AMQException
+ public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1147,12 +1119,12 @@
}
else if(status != OperationStatus.NOTFOUND)
{
- throw new AMQException("Error updating queue details within the store: " + status);
+ throw new AMQStoreException("Error updating queue details within the store: " + status);
}
}
catch (DatabaseException e)
{
- throw new AMQException("Error updating queue details within the store: " + e,e);
+ throw new AMQStoreException("Error updating queue details within the store: " + e,e);
}
}
@@ -1161,9 +1133,9 @@
*
* @param queue The queue to remove.
*
- * @throws AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
@@ -1180,12 +1152,12 @@
OperationStatus status = _queueDb.delete(null, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQException("Queue " + name + " not found");
+ throw new AMQStoreException("Queue " + name + " not found");
}
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
+ throw new AMQStoreException("Error writing deleting with name " + name + " from database: " + e.getMessage(), e);
}
}
@@ -1196,9 +1168,9 @@
* @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
- * @throws AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
// _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
// + ", Long messageId): called");
@@ -1223,8 +1195,8 @@
}
catch (DatabaseException e)
{
- _log.error("Failed to enqueue: " + e, e);
- throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ _log.error("Failed to enqueue: " + e.getMessage(), e);
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
}
@@ -1236,9 +1208,9 @@
* @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
*
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
@@ -1261,11 +1233,11 @@
OperationStatus status = _deliveryDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
}
else if (status != OperationStatus.SUCCESS)
{
- throw new AMQException("Unable to remove message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to remove message with id " + messageId + " on queue " + name);
}
if (_log.isDebugEnabled())
@@ -1277,10 +1249,10 @@
catch (DatabaseException e)
{
- _log.error("Failed to dequeue message " + messageId + ": " + e, e);
+ _log.error("Failed to dequeue message " + messageId + ": " + e.getMessage(), e);
_log.error(tx);
- throw new AMQException("Error accessing database while dequeuing message: " + e, e);
+ throw new AMQStoreException("Error accessing database while dequeuing message: " + e.getMessage(), e);
}
}
@@ -1289,9 +1261,9 @@
*
* @param context The transactional context to commit all operations for.
*
- * @throws AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws AMQException
+ private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws AMQStoreException
{
com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
@@ -1302,7 +1274,7 @@
if (tx == null)
{
- throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
+ throw new AMQStoreException("Fatal internal error: transactional context is empty at commitTran");
}
StoreFuture result;
@@ -1317,7 +1289,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error commit tx: " + e, e);
+ throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
finally
{
@@ -1332,9 +1304,9 @@
*
* @param context The transactional context to abandon.
*
- * @throws AMQException If the operation fails for any reason.
+ * @throws AMQStoreException If the operation fails for any reason.
*/
- public void abortTran(StoreContext context) throws AMQException
+ public void abortTran(StoreContext context) throws AMQStoreException
{
com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
@@ -1349,7 +1321,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error aborting transaction: " + e, e);
+ throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
}
finally
{
@@ -1364,7 +1336,7 @@
*
* @return a list of message ids for messages enqueued for a particular queue
*/
- List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQException
+ List<Long> getEnqueuedMessages(AMQShortString queueName) throws AMQStoreException
{
Cursor cursor = null;
try
@@ -1400,7 +1372,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Database error: " + e, e);
+ throw new AMQStoreException("Database error: " + e.getMessage(), e);
}
finally
{
@@ -1412,7 +1384,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error closing cursor: " + e, e);
+ throw new AMQStoreException("Error closing cursor: " + e.getMessage(), e);
}
}
}
@@ -1436,10 +1408,10 @@
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
protected void addContent(StoreContext context, Long messageId, int offset,
- ByteBuffer contentBody) throws AMQException
+ ByteBuffer contentBody) throws AMQStoreException
{
com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
@@ -1455,7 +1427,7 @@
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
+ throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
+ status);
}
@@ -1466,7 +1438,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
@@ -1477,10 +1449,10 @@
* @param messageId The message to store the data for.
* @param messageMetaData The message meta data to store.
*
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
private void storeMetaData(StoreContext context, Long messageId, StorableMessageMetaData messageMetaData)
- throws AMQException
+ throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1507,7 +1479,7 @@
}
catch (DatabaseException e)
{
- throw new AMQException("Error writing message metadata with id " + messageId + " to database: " + e, e);
+ throw new AMQStoreException("Error writing message metadata with id " + messageId + " to database: " + e.getMessage(), e);
}
}
@@ -1518,9 +1490,9 @@
*
* @return The message meta data.
*
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQStoreException
{
if (_log.isDebugEnabled())
{
@@ -1539,7 +1511,7 @@
OperationStatus status = _messageMetaDataDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQException("Metadata not found for message with id " + messageId);
+ throw new AMQStoreException("Metadata not found for message with id " + messageId);
}
StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
@@ -1548,8 +1520,7 @@
}
catch (DatabaseException e)
{
-
- throw new AMQException("Error reading message metadata for message with id " + messageId + ": " + e, e);
+ throw new AMQStoreException("Error reading message metadata for message with id " + messageId + ": " + e.getMessage(), e);
}
}
@@ -1563,9 +1534,9 @@
*
* @return The number of bytes inserted into the destination
*
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQException
+ public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQStoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
@@ -1634,8 +1605,7 @@
}
catch (DatabaseException e)
{
-
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
finally
{
@@ -1647,8 +1617,7 @@
}
catch (DatabaseException e)
{
- // TODO
- throw new RuntimeException(e);
+ throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
}
@@ -1721,32 +1690,32 @@
return _queueBindingsDb;
}
- void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_messageMetaDataDb, visitor);
}
- void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_messageContentDb, visitor);
}
- void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_queueDb, visitor);
}
- void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_deliveryDb, visitor);
}
- void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_exchangeDb, visitor);
}
- void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
visitDatabase(_queueBindingsDb, visitor);
}
@@ -1758,9 +1727,9 @@
* @param visitor The visitor to give each entry to.
*
* @throws DatabaseException If there is a problem with the Database structure
- * @throws AMQException If there is a programming error
+ * @throws AMQStoreException If there is a problem with the Database contents
*/
- void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQException
+ void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
{
Cursor cursor = database.openCursor(null, null);
@@ -2036,7 +2005,7 @@
//TODO
throw new RuntimeException(e);
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
//TODO
throw new RuntimeException(e);
@@ -2053,7 +2022,7 @@
{
metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
//TODO
throw new RuntimeException(e);
@@ -2075,7 +2044,7 @@
{
BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
//TODO
throw new RuntimeException(e);
@@ -2088,7 +2057,7 @@
{
return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
// TODO
throw new RuntimeException(e);
@@ -2108,7 +2077,7 @@
BDBMessageStore.this.commitTranImpl(_ctx, true);
}
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
//TODO
throw new RuntimeException(e);
@@ -2128,7 +2097,7 @@
{
BDBMessageStore.this.removeMessage(_messageId);
}
- catch (AMQException e)
+ catch (AMQStoreException e)
{
// TODO
throw new RuntimeException(e);
@@ -2156,28 +2125,28 @@
_ctx.setPayload(_txn);
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
{
BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
}
- public void commitTran() throws AMQException
+ public void commitTran() throws AMQStoreException
{
BDBMessageStore.this.commitTranImpl(_ctx, true);
}
- public StoreFuture commitTranAsync() throws AMQException
+ public StoreFuture commitTranAsync() throws AMQStoreException
{
return BDBMessageStore.this.commitTranImpl(_ctx, false);
}
- public void abortTran() throws AMQException
+ public void abortTran() throws AMQStoreException
{
BDBMessageStore.this.abortTran(_ctx);
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -40,6 +40,7 @@
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.cli.Options;
@@ -451,7 +452,7 @@
DatabaseVisitor queueVisitor = new DatabaseVisitor()
{
- public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQException
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
{
QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
AMQShortString queueName = queueRec.getNameShortString();
@@ -807,6 +808,7 @@
}
+ @SuppressWarnings("static-access")
private static void setOptions(Options options)
{
Option input =
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import org.apache.qpid.AMQStoreException;
+
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.AMQException;
/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
public abstract class DatabaseVisitor
{
protected int _count;
- abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException;
+ abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException;
public int getVisitedCount()
{
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -26,14 +26,7 @@
tupleInput.readFast(data);
ByteBuffer buffer = ByteBuffer.wrap(data);
- try
- {
- return new FieldTable(buffer,length);
- }
- catch (AMQFrameDecodingException e)
- {
- throw new DatabaseException(e);
- }
+ return new FieldTable(buffer,length);
}
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-07-26 15:58:14 UTC (rev 4147)
@@ -331,4 +331,4 @@
{
new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_2);
}
-}
\ No newline at end of file
+}
Modified: store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile
===================================================================
--- store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile 2010-07-26 13:39:22 UTC (rev 4146)
+++ store/trunk/java/bdbstore/test-profiles/java-bdb.0.10.testprofile 2010-07-26 15:58:14 UTC (rev 4147)
@@ -5,7 +5,7 @@
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=${project.root}/build/etc/config-systests-bdb.xml
-profile.excludes=JavaStandaloneExcludes JavaPersistentExcludes CPPExcludes CPPTransientExcludes
+profile.excludes=JavaStandaloneExcludes JavaPersistentExcludes Java010Excludes 08StandaloneExcludes
broker.clean.between.tests=true
broker.persistent=true
14 years, 5 months