rhmessaging commits: r2287 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-08-12 17:37:06 -0400 (Tue, 12 Aug 2008)
New Revision: 2287
Added:
mgmt/trunk/cumin/python/cumin/pool.py
mgmt/trunk/cumin/python/cumin/pool.strings
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/page.py
mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Add basic navigation for grid pools
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-08-12 21:35:11 UTC (rev 2286)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-08-12 21:37:06 UTC (rev 2287)
@@ -36,6 +36,7 @@
CuminBrokerRegistration(self)
CuminBrokerGroup(self)
+ CuminPool(self)
def check(self):
self.data.check()
@@ -1353,6 +1354,16 @@
def get_icon_href(self, session):
return "resource?name=group-36.png"
+class CuminPool(CuminClass):
+ def __init__(self, model):
+ super(CuminPool, self).__init__(model, "pool", Pool)
+
+ def get_title(self, session):
+ return "Pool"
+
+ def show_object(self, session, pool):
+ return self.cumin_model.show_main(session).show_pool(session, pool)
+
class ModelPage(Page):
def __init__(self, app, name):
super(ModelPage, self).__init__(app, name)
Modified: mgmt/trunk/cumin/python/cumin/page.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.py 2008-08-12 21:35:11 UTC (rev 2286)
+++ mgmt/trunk/cumin/python/cumin/page.py 2008-08-12 21:37:06 UTC (rev 2287)
@@ -7,6 +7,7 @@
from brokergroup import *
from brokerprofile import *
from brokercluster import *
+from pool import *
from system import *
from action import *
from widgets import *
@@ -89,6 +90,9 @@
self.__cluster = BrokerClusterFrame(app, "cluster")
self.add_mode(self.__cluster)
+ self.__pool = PoolFrame(app, "pool")
+ self.add_mode(self.__pool)
+
self.__system = SystemFrame(app, "system")
self.add_mode(self.__system)
@@ -159,6 +163,11 @@
frame.set_object(session, cluster)
return self.page.set_current_frame(session, frame)
+ def show_pool(self, session, pool):
+ frame = self.show_mode(session, self.__pool)
+ frame.set_object(session, pool)
+ return self.page.set_current_frame(session, frame)
+
def show_system(self, session, system):
frame = self.show_mode(session, self.__system)
frame.set_object(session, system)
@@ -198,7 +207,7 @@
self.add_parameter(self.selection)
self.add_link(self.MessagingTab(app, "mtab"))
- #self.add_link(self.GridTab(app, "gtab"))
+ self.add_link(self.GridTab(app, "gtab"))
self.add_link(self.SystemsTab(app, "stab"))
def render_class(self, session):
@@ -278,6 +287,11 @@
return "Tags"
class GridView(TabbedModeSet):
+ def __init__(self, app, name):
+ super(GridView, self).__init__(app, name)
+
+ self.add_tab(PoolSet(app, "pools"))
+
def render_title(self, session):
return "Grid"
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2008-08-12 21:35:11 UTC (rev 2286)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2008-08-12 21:37:06 UTC (rev 2287)
@@ -72,6 +72,13 @@
def do_marshal(self, peer):
return str(peer.id)
+class PoolParameter(Parameter):
+ def do_unmarshal(self, string):
+ return Pool.get(int(string))
+
+ def do_marshal(self, pool):
+ return str(pool.id)
+
class QueueParameter(Parameter):
def do_unmarshal(self, string):
return Queue.get(int(string))
Added: mgmt/trunk/cumin/python/cumin/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.py (rev 0)
+++ mgmt/trunk/cumin/python/cumin/pool.py 2008-08-12 21:37:06 UTC (rev 2287)
@@ -0,0 +1,63 @@
+import logging
+
+from wooly import *
+from wooly.widgets import *
+from wooly.forms import *
+from wooly.resources import *
+from wooly.tables import *
+
+from stat import *
+from widgets import *
+from parameters import *
+from formats import *
+from util import *
+
+strings = StringCatalog(__file__)
+log = logging.getLogger("cumin.pool")
+
+class PoolSet(CuminTable):
+ def __init__(self, app, name):
+ super(PoolSet, self).__init__(app, name)
+
+ col = self.NameColumn(app, "name")
+ self.add_column(col)
+
+ self.set_default_column(col)
+
+ def render_title(self, session):
+ return "Pools %s" % fmt_count(Pool.select().count())
+
+ class NameColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "Name"
+
+ def render_content(self, session, data):
+ pool = Identifiable(data["id"])
+ branch = session.branch()
+ self.frame.show_pool(branch, pool).show_view(branch)
+ return fmt_olink(branch, pool, name=data["name"])
+
+class PoolFrame(CuminFrame):
+ def __init__(self, app, name):
+ super(PoolFrame, self).__init__(app, name)
+
+ self.object = PoolParameter(app, "id")
+ self.add_parameter(self.object)
+
+ view = PoolView(app, "view")
+ self.add_mode(view)
+ self.set_view_mode(view)
+
+class PoolView(CuminView):
+ def __init__(self, app, name):
+ super(PoolView, self).__init__(app, name)
+
+ status = PoolStatus(app, "status")
+ self.add_child(status)
+
+ self.__tabs = TabbedModeSet(app, "tabs")
+ self.add_child(self.__tabs)
+
+class PoolStatus(CuminStatus):
+ pass
+
Added: mgmt/trunk/cumin/python/cumin/pool.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.strings (rev 0)
+++ mgmt/trunk/cumin/python/cumin/pool.strings 2008-08-12 21:37:06 UTC (rev 2287)
@@ -0,0 +1,8 @@
+[PoolSet.sql]
+select
+ p.id,
+ p.name
+from pool as p
+
+[PoolSet.count_sql]
+select count(*) from pool
17 years, 8 months
rhmessaging commits: r2286 - in mgmt/trunk/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-08-12 17:35:11 -0400 (Tue, 12 Aug 2008)
New Revision: 2286
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/sql/schema.sql
Log:
Add basic pool navigation. Add a pool data object, and update the
schema.
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-08-12 20:25:47 UTC (rev 2285)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-08-12 21:35:11 UTC (rev 2286)
@@ -110,6 +110,12 @@
value = StringCol(length=1000, default=None)
type = StringCol(length=1, default="s")
+class Pool(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+
class ConsoleUser(SQLObject):
class sqlmeta:
lazyUpdate = True
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-08-12 20:25:47 UTC (rev 2285)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-08-12 21:35:11 UTC (rev 2286)
@@ -32,7 +32,6 @@
classInfos = dict() # brokerId => classInfo
-System.sqlmeta.addJoin(SQLMultipleJoin('SystemStats', joinMethodName='stats'))
class Broker(SQLObject):
@@ -100,9 +99,6 @@
conn.callMethod(self.idOriginal, classInfo, "connect",
callback, args=actualArgs)
-System.sqlmeta.addJoin(SQLMultipleJoin('Broker', joinMethodName='brokers'))
-
-
class BrokerStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -113,7 +109,6 @@
classInfos = dict() # brokerId => classInfo
-Broker.sqlmeta.addJoin(SQLMultipleJoin('BrokerStats', joinMethodName='stats'))
class Agent(SQLObject):
@@ -127,7 +122,7 @@
managedBroker = StringCol(length=1000, default=None)
statsCurr = ForeignKey('AgentStats', cascade='null', default=None)
statsPrev = ForeignKey('AgentStats', cascade='null', default=None)
- sessionId = BLOBCol(default=None)
+ sessionName = StringCol(length=1000, default=None)
label = StringCol(length=1000, default=None)
registeredTo = BigIntCol(default=None)
systemId = BLOBCol(default=None)
@@ -144,7 +139,6 @@
classInfos = dict() # brokerId => classInfo
-Agent.sqlmeta.addJoin(SQLMultipleJoin('AgentStats', joinMethodName='stats'))
class Vhost(SQLObject):
@@ -163,9 +157,6 @@
classInfos = dict() # brokerId => classInfo
-Broker.sqlmeta.addJoin(SQLMultipleJoin('Vhost', joinMethodName='vhosts'))
-
-
class VhostStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -176,7 +167,6 @@
classInfos = dict() # brokerId => classInfo
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('VhostStats', joinMethodName='stats'))
class Queue(SQLObject):
@@ -207,9 +197,6 @@
conn.callMethod(self.idOriginal, classInfo, "purge",
callback, args=actualArgs)
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('Queue', joinMethodName='queues'))
-
-
class QueueStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -255,7 +242,6 @@
classInfos = dict() # brokerId => classInfo
-Queue.sqlmeta.addJoin(SQLMultipleJoin('QueueStats', joinMethodName='stats'))
class Exchange(SQLObject):
@@ -276,9 +262,6 @@
classInfos = dict() # brokerId => classInfo
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
-
-
class ExchangeStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -301,7 +284,6 @@
classInfos = dict() # brokerId => classInfo
-Exchange.sqlmeta.addJoin(SQLMultipleJoin('ExchangeStats', joinMethodName='stats'))
class Binding(SQLObject):
@@ -322,11 +304,6 @@
classInfos = dict() # brokerId => classInfo
-Exchange.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
-
-Queue.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
-
-
class BindingStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -338,7 +315,6 @@
classInfos = dict() # brokerId => classInfo
-Binding.sqlmeta.addJoin(SQLMultipleJoin('BindingStats', joinMethodName='stats'))
class ClientConnection(SQLObject):
@@ -365,9 +341,6 @@
conn.callMethod(self.idOriginal, classInfo, "close",
callback, args=actualArgs)
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('ClientConnection', joinMethodName='clientConnections'))
-
-
class ClientConnectionStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -384,7 +357,6 @@
classInfos = dict() # brokerId => classInfo
-ClientConnection.sqlmeta.addJoin(SQLMultipleJoin('ClientConnectionStats', joinMethodName='stats'))
class Link(SQLObject):
@@ -429,9 +401,6 @@
conn.callMethod(self.idOriginal, classInfo, "bridge",
callback, args=actualArgs)
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('Link', joinMethodName='links'))
-
-
class LinkStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -444,7 +413,6 @@
classInfos = dict() # brokerId => classInfo
-Link.sqlmeta.addJoin(SQLMultipleJoin('LinkStats', joinMethodName='stats'))
class Bridge(SQLObject):
@@ -478,9 +446,6 @@
conn.callMethod(self.idOriginal, classInfo, "close",
callback, args=actualArgs)
-Link.sqlmeta.addJoin(SQLMultipleJoin('Bridge', joinMethodName='bridges'))
-
-
class BridgeStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -491,7 +456,6 @@
classInfos = dict() # brokerId => classInfo
-Bridge.sqlmeta.addJoin(SQLMultipleJoin('BridgeStats', joinMethodName='stats'))
class Session(SQLObject):
@@ -541,11 +505,6 @@
conn.callMethod(self.idOriginal, classInfo, "close",
callback, args=actualArgs)
-Vhost.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
-
-ClientConnection.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
-
-
class SessionStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -559,36 +518,8 @@
classInfos = dict() # brokerId => classInfo
-Session.sqlmeta.addJoin(SQLMultipleJoin('SessionStats', joinMethodName='stats'))
-classToSchemaNameMap = dict()
-schemaNameToClassMap = dict()
-classToSchemaNameMap['System'] = 'System'
-schemaNameToClassMap['System'] = System
-classToSchemaNameMap['Broker'] = 'Broker'
-schemaNameToClassMap['Broker'] = Broker
-classToSchemaNameMap['Agent'] = 'Agent'
-schemaNameToClassMap['Agent'] = Agent
-classToSchemaNameMap['Vhost'] = 'Vhost'
-schemaNameToClassMap['Vhost'] = Vhost
-classToSchemaNameMap['Queue'] = 'Queue'
-schemaNameToClassMap['Queue'] = Queue
-classToSchemaNameMap['Exchange'] = 'Exchange'
-schemaNameToClassMap['Exchange'] = Exchange
-classToSchemaNameMap['Binding'] = 'Binding'
-schemaNameToClassMap['Binding'] = Binding
-classToSchemaNameMap['ClientConnection'] = 'ClientConnection'
-schemaNameToClassMap['ClientConnection'] = ClientConnection
-classToSchemaNameMap['Link'] = 'Link'
-schemaNameToClassMap['Link'] = Link
-classToSchemaNameMap['Bridge'] = 'Bridge'
-schemaNameToClassMap['Bridge'] = Bridge
-classToSchemaNameMap['Session'] = 'Session'
-schemaNameToClassMap['Session'] = Session
-
-
-
class Store(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -608,9 +539,6 @@
classInfos = dict() # brokerId => classInfo
-Broker.sqlmeta.addJoin(SQLMultipleJoin('Store', joinMethodName='stores'))
-
-
class StoreStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -621,7 +549,6 @@
classInfos = dict() # brokerId => classInfo
-Store.sqlmeta.addJoin(SQLMultipleJoin('StoreStats', joinMethodName='stats'))
class Journal(SQLObject):
@@ -655,9 +582,6 @@
conn.callMethod(self.idOriginal, classInfo, "expand",
callback, args=actualArgs)
-Queue.sqlmeta.addJoin(SQLMultipleJoin('Journal', joinMethodName='journals'))
-
-
class JournalStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -695,9 +619,109 @@
classInfos = dict() # brokerId => classInfo
-Journal.sqlmeta.addJoin(SQLMultipleJoin('JournalStats', joinMethodName='stats'))
+
+classToSchemaNameMap = dict()
+schemaNameToClassMap = dict()
+classToSchemaNameMap['System'] = 'System'
+schemaNameToClassMap['System'] = System
+
+System.sqlmeta.addJoin(SQLMultipleJoin('SystemStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Broker'] = 'Broker'
+schemaNameToClassMap['Broker'] = Broker
+
+System.sqlmeta.addJoin(SQLMultipleJoin('Broker', joinMethodName='brokers'))
+
+
+Broker.sqlmeta.addJoin(SQLMultipleJoin('BrokerStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Agent'] = 'Agent'
+schemaNameToClassMap['Agent'] = Agent
+
+Agent.sqlmeta.addJoin(SQLMultipleJoin('AgentStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Vhost'] = 'Vhost'
+schemaNameToClassMap['Vhost'] = Vhost
+
+Broker.sqlmeta.addJoin(SQLMultipleJoin('Vhost', joinMethodName='vhosts'))
+
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('VhostStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Queue'] = 'Queue'
+schemaNameToClassMap['Queue'] = Queue
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('Queue', joinMethodName='queues'))
+
+
+Queue.sqlmeta.addJoin(SQLMultipleJoin('QueueStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Exchange'] = 'Exchange'
+schemaNameToClassMap['Exchange'] = Exchange
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
+
+
+Exchange.sqlmeta.addJoin(SQLMultipleJoin('ExchangeStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Binding'] = 'Binding'
+schemaNameToClassMap['Binding'] = Binding
+
+Exchange.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
+
+Queue.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
+
+
+Binding.sqlmeta.addJoin(SQLMultipleJoin('BindingStats', joinMethodName='stats'))
+
+classToSchemaNameMap['ClientConnection'] = 'ClientConnection'
+schemaNameToClassMap['ClientConnection'] = ClientConnection
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('ClientConnection', joinMethodName='clientConnections'))
+
+
+ClientConnection.sqlmeta.addJoin(SQLMultipleJoin('ClientConnectionStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Link'] = 'Link'
+schemaNameToClassMap['Link'] = Link
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('Link', joinMethodName='links'))
+
+
+Link.sqlmeta.addJoin(SQLMultipleJoin('LinkStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Bridge'] = 'Bridge'
+schemaNameToClassMap['Bridge'] = Bridge
+
+Link.sqlmeta.addJoin(SQLMultipleJoin('Bridge', joinMethodName='bridges'))
+
+
+Bridge.sqlmeta.addJoin(SQLMultipleJoin('BridgeStats', joinMethodName='stats'))
+
+classToSchemaNameMap['Session'] = 'Session'
+schemaNameToClassMap['Session'] = Session
+
+Vhost.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
+
+ClientConnection.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
+
+
+Session.sqlmeta.addJoin(SQLMultipleJoin('SessionStats', joinMethodName='stats'))
+
classToSchemaNameMap['Store'] = 'Store'
schemaNameToClassMap['Store'] = Store
+
+Broker.sqlmeta.addJoin(SQLMultipleJoin('Store', joinMethodName='stores'))
+
+
+Store.sqlmeta.addJoin(SQLMultipleJoin('StoreStats', joinMethodName='stats'))
+
classToSchemaNameMap['Journal'] = 'Journal'
schemaNameToClassMap['Journal'] = Journal
+
+Queue.sqlmeta.addJoin(SQLMultipleJoin('Journal', joinMethodName='journals'))
+
+
+Journal.sqlmeta.addJoin(SQLMultipleJoin('JournalStats', joinMethodName='stats'))
+
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2008-08-12 20:25:47 UTC (rev 2285)
+++ mgmt/trunk/mint/sql/schema.sql 2008-08-12 21:35:11 UTC (rev 2286)
@@ -51,6 +51,11 @@
version VARCHAR(1000) NOT NULL
);
+CREATE TABLE pool (
+ id SERIAL PRIMARY KEY,
+ name VARCHAR(1000)
+);
+
CREATE TABLE agent (
id SERIAL PRIMARY KEY,
id_original BIGINT,
@@ -60,7 +65,7 @@
managed_broker VARCHAR(1000),
stats_curr_id INT,
stats_prev_id INT,
- session_id BYTEA,
+ session_name VARCHAR(1000),
label VARCHAR(1000),
registered_to BIGINT,
system_id BYTEA
17 years, 8 months
rhmessaging commits: r2285 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-08-12 16:25:47 -0400 (Tue, 12 Aug 2008)
New Revision: 2285
Modified:
mgmt/trunk/mint/python/mint/schemaparser.py
Log:
handle reserved words based on a defined mapping, not on a case-by-case basis
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-08-12 18:54:32 UTC (rev 2284)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-08-12 20:25:47 UTC (rev 2285)
@@ -27,7 +27,18 @@
self.dataTypesMap["bool"] = "BoolCol"
self.dataTypesMap["sstr"] = self.dataTypesMap["lstr"] = "StringCol"
self.dataTypesMap["map"] = "StringCol"
-
+ # mapping for identifiers in the XML schema that are reserved words in either SQL or Python
+ self.reservedWords = {"in": "inRsv", "In": "InRsv",
+ "connection": "clientConnection", "Connection": "ClientConnection",
+ "other": "otherRsv"}
+
+ def renameReservedWord(self, name):
+ if (name in self.reservedWords.keys()):
+ print "Notice: %s is a reserved word, automatically translating to %s" % (name, self.reservedWords[name])
+ return self.reservedWords[name]
+ else:
+ return name
+
def attrNameFromDbColumn(self, name, removeSuffix=""):
return self.style.dbColumnToPythonAttr(name.replace(removeSuffix, ""))
@@ -50,8 +61,7 @@
def generateForeignKeyAttrib(self, name, reference):
params = "'%s', cascade='null'" % (reference)
- if (name == "connection"):
- name = "clientConnection"
+ name = self.renameReservedWord(name)
self.generateAttrib(name, "ForeignKey", params)
def generateHiLoAttrib(self, name, type):
@@ -78,43 +88,39 @@
if (schemaName == "JournalStats"):
print schemaName
for elem in elements:
- # special handling due to name conflict with SqlObject.connection
- if (elem["@name"] == "connection"):
- elem["@name"] = "clientConnection"
+ elemName = self.renameReservedWord(elem["@name"])
if (elem["@type"] == "objId"):
- if (elem["@name"].endswith("Ref")):
+ if (elemName.endswith("Ref")):
reference = elem["@references"]
# handle cases where the referenced class is in a different namespace (ie, contains a ".")
namespaceIndex = reference.find(".")
if (namespaceIndex > 0):
reference = reference[namespaceIndex + 1:]
reference = self.style.dbTableToPythonClass(reference)
- if (reference == "Connection"):
- reference = "ClientConnection"
+ reference = self.renameReservedWord(reference)
attrib = reference[0].lower() + reference[1:]
self.generateForeignKeyAttrib(attrib, reference)
self.generateMultipleJoin(reference, self.currentClass)
else:
# if reference doesn't have a "Ref" prefix, handle as a large uint
- self.generateAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap["uint64"])
+ self.generateAttrib(self.attrNameFromDbColumn(elemName), self.dataTypesMap["uint64"])
elif (elem["@type"].startswith("hilo")):
- self.generateHiLoAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]])
+ self.generateHiLoAttrib(self.attrNameFromDbColumn(elemName), self.dataTypesMap[elem["@type"]])
elif (elem["@type"].startswith("mma")):
- self.generateMinMaxAvgAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]])
+ self.generateMinMaxAvgAttrib(self.attrNameFromDbColumn(elemName), self.dataTypesMap[elem["@type"]])
else:
args = ""
if (elem["@type"] == "sstr"):
args += "length=1000"
elif (elem["@type"] == "lstr" or elem["@type"] == "ftable"):
args += "length=4000"
- self.generateAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]], args)
+ self.generateAttrib(self.attrNameFromDbColumn(elemName), self.dataTypesMap[elem["@type"]], args)
self.pythonOutput += "\n"
self.pythonOutput += " classInfos = dict() # brokerId => classInfo\n"
def startClass(self, schemaName, stats=False):
- if (schemaName == "Connection"):
- schemaName = "ClientConnection"
+ schemaName = self.renameReservedWord(schemaName)
if (stats):
origPythonName = self.style.dbTableToPythonClass(schemaName)
pythonName = self.style.dbTableToPythonClass(schemaName + "_stats")
17 years, 8 months
rhmessaging commits: r2284 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-08-12 14:54:32 -0400 (Tue, 12 Aug 2008)
New Revision: 2284
Modified:
mgmt/trunk/cumin/python/cumin/brokerlink.py
Log:
Fix number of arguments to BrokerLinkAdd.host.clear()
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-08-12 18:12:20 UTC (rev 2283)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-08-12 18:54:32 UTC (rev 2284)
@@ -315,7 +315,7 @@
self.errors["port"]
def process_cancel(self, session):
- self.host.clear(session)
+ self.host.clear()
branch = session.branch()
self.frame.show_view(branch)
self.page.set_redirect_url(session, branch.marshal())
17 years, 8 months
rhmessaging commits: r2283 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-08-12 14:12:20 -0400 (Tue, 12 Aug 2008)
New Revision: 2283
Modified:
mgmt/trunk/cumin/python/cumin/broker.py
mgmt/trunk/cumin/python/cumin/brokerlink.py
mgmt/trunk/cumin/python/cumin/brokerlink.strings
mgmt/trunk/cumin/python/cumin/model.py
Log:
Added Broker Link actions:
- Add
- Close multiple
- Close one
Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py 2008-08-12 17:33:40 UTC (rev 2282)
+++ mgmt/trunk/cumin/python/cumin/broker.py 2008-08-12 18:12:20 UTC (rev 2283)
@@ -170,12 +170,18 @@
self.__peer = PeerFrame(app, "peer")
self.add_mode(self.__peer)
+
+ self.__broker_add = BrokerLinkAdd(app, "brokeradd")
+ self.add_mode(self.__broker_add)
self.__conn = ConnectionFrame(app, "conn")
self.add_mode(self.__conn)
self.__conns_close = ConnectionSetClose(app, "connsclose")
self.add_mode(self.__conns_close)
+
+ self.__broker_links_close = BrokerSetClose(app, "brokersclose")
+ self.add_mode(self.__broker_links_close)
def show_queue(self, session, queue):
self.__queue.set_object(session, queue)
@@ -221,6 +227,14 @@
self.page.set_current_frame(session, self.__peer)
return self.show_mode(session, self.__peer)
+ def show_broker_link_add(self, session):
+ self.page.set_current_frame(session, self.__broker_add)
+ return self.show_mode(session, self.__broker_add)
+
+ def show_broker_links_close(self, session):
+ self.page.set_current_frame(session, self.__broker_links_close)
+ return self.show_mode(session, self.__broker_links_close)
+
def show_connection(self, session, conn):
self.__conn.set_object(session, conn)
self.page.set_current_frame(session, self.__conn)
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-08-12 17:33:40 UTC (rev 2282)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-08-12 18:12:20 UTC (rev 2283)
@@ -12,10 +12,13 @@
strings = StringCatalog(__file__)
-class PeerSet(CuminTable):
+class PeerSet(CuminTable, Form):
def __init__(self, app, name):
super(PeerSet, self).__init__(app, name)
+ self.ids = CheckboxIdColumn(app, "id", self)
+ self.add_column(self.ids)
+
col = self.AddressColumn(app, "addr")
self.add_column(col)
@@ -33,7 +36,15 @@
#col = self.ToPeerColumn(app, "to_peer")
#self.add_column(col)
+
+ self.__close = self.Close(app, "close", self)
+ self.add_child(self.__close)
+ def render_add_broker_link_url(self, session, vhost):
+ branch = session.branch()
+ self.frame.show_broker_link_add(branch)
+ return branch.marshal()
+
def get_args(self, session):
reg = self.frame.get_object(session)
return (reg.getDefaultVhost(),)
@@ -89,6 +100,19 @@
def render_value(self, session, value):
return fmt_rate(value)
+ class Close(FormButton):
+ def render_content(self, session):
+ return "Close"
+
+ def process_submit(self, session):
+ ids = self.parent.ids.get(session)
+ self.parent.ids.clear(session)
+
+ branch = session.branch()
+ frame = self.frame.show_broker_links_close(branch)
+ frame.ids.set(branch, ids)
+ self.page.set_redirect_url(session, branch.marshal())
+
class PeerFrame(CuminFrame):
def __init__(self, app, name):
super(PeerFrame, self).__init__(app, name)
@@ -99,10 +123,40 @@
view = PeerView(app, "view")
self.add_mode(view)
self.set_view_mode(view)
-
+
+ remove = LinkRemove(app, "remove")
+ self.add_mode(remove)
+ self.set_remove_mode(remove)
+
def render_title(self, session, peer):
return super(PeerFrame, self).render_title(session, peer)
+class LinkRemove(CuminConfirmForm):
+ def get_args(self, session):
+ return self.frame.get_args(session)
+
+ def process_cancel(self, session, link):
+ branch = session.branch()
+ self.frame.show_view(branch)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def process_submit(self, session, link):
+ action = self.app.model.link.close
+ action.invoke(link)
+
+ branch = session.branch()
+ self.frame.frame.show_view(branch)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def render_title(self, session, link):
+ return "Close Broker Link '%s:%d'" % (link.host, link.port)
+
+ def render_submit_content(self, session, link):
+ return "Yes, Close Broker Link"
+
+ def render_cancel_content(self, session, link):
+ return "No, Cancel"
+
class PeerStatus(CuminStatus):
def render_messages_received(self, session, peer):
return self.app.model.exchange.msgReceives.rate_html(peer)
@@ -201,3 +255,144 @@
def render_title(self, session, exchange):
return "Messages Received, Routed, and Dropped"
+class BrokerLinkAdd(CuminFieldForm):
+ def __init__(self, app, name):
+ super(BrokerLinkAdd, self).__init__(app, name)
+
+ self.host = DictParameter(app, "host")
+ self.add_parameter(self.host)
+ self.add_form_parameter(self.host)
+
+ self.durable = DurabilityField(app, "durable", self)
+ self.add_field(self.durable)
+
+ self.errors = dict()
+
+ def render_title(self, session, *args):
+ reg = self.frame.get_object(session)
+ return "Add Broker Link to '%s'" % reg.name
+
+ def render_broker_name_path(self, session, *args):
+ return "_".join((self.host.path, "name"))
+
+ def render_broker_port_path(self, session, *args):
+ return "_".join((self.host.path, "port"))
+
+ def render_broker_name_value(self, session, *args):
+ host = self.host.get(session)
+ if "name" in host:
+ return host["name"]
+
+ def render_broker_port_value(self, session, *args):
+ host = self.host.get(session)
+ if "port" in host:
+ return host["port"]
+
+ def render_broker_username_path(self, session, *args):
+ return "_".join((self.host.path, "username"))
+
+ def render_broker_password_path(self, session, *args):
+ return "_".join((self.host.path, "password"))
+
+ def render_broker_username_value(self, session, *args):
+ host = self.host.get(session)
+ if "username" in host:
+ return host["username"]
+
+ def render_broker_password_value(self, session, *args):
+ host = self.host.get(session)
+ if "password" in host:
+ return host["password"]
+
+ def render_broker_name_error(self, session, *args):
+ if "name" in self.errors:
+ return "<ul class=\"errors\" style=\"float:left;\"><li>%s</li></ul>" % \
+ self.errors["name"]
+
+ def render_broker_port_error(self, session, *args):
+ if "port" in self.errors:
+ return "<ul class=\"errors\" style=\"float:left;\"><li>%s</li></ul>" % \
+ self.errors["port"]
+
+ def process_cancel(self, session):
+ self.host.clear(session)
+ branch = session.branch()
+ self.frame.show_view(branch)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def validate(self, session):
+ self.errors = dict()
+
+ host = self.host.get(session)
+ if not "name" in host:
+ self.errors["name"] = "Host name is required"
+ if "port" in host:
+ port = host["port"]
+ try:
+ # ensure a number and remove surrounding spaces
+ port = str(int(port))
+ except:
+ self.errors["port"] = "Port must be a number"
+
+ return not len(self.errors)
+
+ def process_submit(self, session):
+
+ super_error = super(BrokerLinkAdd, self).validate(session)
+
+ if self.validate(session) and not super_error:
+ host = self.host.get(session)
+ username = "anonymous"
+ password = ""
+ port = 5672
+ addr = host["name"]
+
+ if "port" in host:
+ port = int(host["port"])
+
+ durable = self.durable.get(session)
+ if "username" in host:
+ username = host["username"]
+ if "password" in host:
+ password = host["password"]
+
+ link = Link()
+ link.host = addr
+ link.port = port
+ link.useSsl = False
+ link.durable = (durable == "durable")
+ reg = self.frame.get_object(session)
+
+ args = {"reg": reg,
+ "username": username,
+ "password": password}
+
+ action = self.app.model.broker.add_link
+ action.invoke(link, args)
+
+ # navigate back to main queue frame
+ self.process_cancel(session)
+
+class BrokerSetClose(CuminBulkActionForm):
+ def process_return(self, session):
+ branch = session.branch()
+ self.frame.show_view(branch)
+ self.page.set_current_frame(branch, self.frame)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def process_item(self, session, id):
+ link = Link.get(id)
+ action = self.app.model.link.close
+ action.invoke(link)
+
+ def render_title(self, session):
+ return "Close Broker Link"
+
+ def render_form_heading(self, session, *args):
+ return "Close Link to:"
+
+ def render_item_content(self, session, id):
+ link = Link.get(id)
+ return "Broker: %s:%d" % (link.host, link.port)
+
+
\ No newline at end of file
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.strings 2008-08-12 17:33:40 UTC (rev 2282)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.strings 2008-08-12 18:12:20 UTC (rev 2283)
@@ -20,6 +20,33 @@
join vhost as v on v.id = l.vhost_id
{sql_where}
+[PeerSet.html]
+<form id="{id}" method="post" action="?">
+
+ <ul class="actions">
+ <li><a class="nav" href="{add_broker_link_url}">Add Broker Link</a></li>
+ </ul>
+
+ <div class="sactions">
+ <h2>Act on Selected Broker Links:</h2>
+ {close}
+ </div>
+
+ <table class="mobjects">
+ <thead>
+ <tr>
+ <th class="setnav" colspan="{column_count}">
+ <div class="rfloat">{page}</div>
+ {count}
+ </th>
+ </tr>
+ <tr>{headers}</tr>
+ </thead>
+ <tbody>{items}</tbody>
+ </table>
+ <div>{hidden_inputs}</div>
+</form>
+
[PeerRouteSet.html]
<div class="rfloat">{page}</div>
<ul class="radiotabs"> </ul>
@@ -42,3 +69,59 @@
<td>{item_exchange}</td>
<td>{item_routing_key}</td>
</tr>
+
+[BrokerLinkAdd.css]
+div.field div.input_prompt {
+ width: 5em;
+ padding-right: 1em;
+ float:left;
+}
+
+div.multiple div.inputs {
+ margin-bottom: 0.25em;
+}
+
+div.multiple div.field {
+ margin-bottom: 1em;
+}
+
+div.field div.input_prompt:after {
+ content: ":";
+}
+
+div.field div.inputs input {
+ float:left;
+}
+div.field div.inputs:after {
+ content: ".";
+ display: block;
+ height: 0;
+ clear: left;
+ visibility: hidden;
+}
+
+[BrokerLinkAdd.html]
+<form id="{id}" class="mform" method="post" action="?">
+ <div class="head">{title}</div>
+ <div class="body multiple">
+ <div class="field">
+ <div class="title">Source Broker</div>
+ <div class="inputs"><div class="input_prompt">Address</div><input type="text" name="{broker_name_path}" value="{broker_name_value}" tabindex="100" size="32"/>{broker_name_error}</div>
+ <div class="inputs"><div class="input_prompt">Port</div><input type="text" name="{broker_port_path}" value="{broker_port_value}" tabindex="100" size="5"/>{broker_port_error}</div>
+ <div class="inputs"><div class="input_prompt">Username</div><input type="text" name="{broker_username_path}" value="{broker_username_value}" tabindex="100" size="32"/></div>
+ <div class="inputs"><div class="input_prompt">Password</div><input type="text" name="{broker_password_path}" value="{broker_password_value}" tabindex="100" size="32"/></div>
+ </div>
+ {fields}
+ </div>
+ {form_error}
+ <div class="foot">
+ {help}
+ {submit}
+ {cancel}
+ </div>
+ <div>{hidden_inputs}</div>
+</form>
+<script type="text/javascript">
+ wooly.doc().elembyid("{id}").node.elements[0].focus();
+</script>
+
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-08-12 17:33:40 UTC (rev 2282)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-08-12 18:12:20 UTC (rev 2283)
@@ -12,7 +12,6 @@
from parameters import *
-
log = getLogger("cumin.model")
class CuminModel(object):
@@ -558,6 +557,9 @@
action = self.AddExchange(self, "add_exchange")
action.summary = True
+ action = self.AddLink(self, "add_link")
+ action.summary = True
+
action = self.AddQueue(self, "add_queue")
action.summary = True
@@ -590,6 +592,31 @@
# raised and we won't get here
completion("OK")
+ class AddLink(CuminAction):
+ def get_title(self, session):
+ return "Add Broker Link"
+
+ def get_verb(self, session):
+ return "Add"
+
+ def show(self, session, exchange):
+ frame = self.cumin_class.show_object(session, exchange)
+ return frame.show_broker_link_add(session)
+
+ def do_invoke(self, link, args, completion):
+ reg = args["reg"]
+ username = args["username"]
+ password = args["password"]
+ if username == "anonymous":
+ authMechanism = "ANONYMOUS"
+ else:
+ authMechanism = "PLAIN"
+
+ broker = reg._get_broker()
+ broker.connect(self.cumin_model.data, completion,
+ link.host, link.port, link.useSsl, link.durable,
+ authMechanism, username, password)
+
class AddQueue(CuminAction):
def get_title(self, session):
return "Add Queue"
@@ -1178,12 +1205,31 @@
stat.unit = "byte"
stat.category = "io"
+ action = self.Close(self, "close")
+ action.summary = True
+
def get_title(self, session):
return "Broker Link"
def get_object_name(self, link):
return "%s:%d" % (link.host, link.port)
+ def show_object(self, session, link):
+ frame = self.cumin_model.show_main(session)
+ return frame.show_broker(session, link.vhost.broker.registration)
+
+ class Close(CuminAction):
+ def show(self, session, link):
+ frame = self.cumin_class.show_object(session, link)
+ frame = frame.show_peer(session, link)
+ return frame.show_remove(session)
+
+ def get_title(self, session):
+ return "Close"
+
+ def do_invoke(self, link, args, completion):
+ link.close(self.cumin_model.data, completion)
+
class CuminStore(RemoteClass):
def __init__(self, model):
super(CuminStore, self).__init__(model, "store", Store, StoreStats)
17 years, 8 months
rhmessaging commits: r2282 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-08-12 13:33:40 -0400 (Tue, 12 Aug 2008)
New Revision: 2282
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
Fix close handling in mint
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-08-12 16:44:48 UTC (rev 2281)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-08-12 17:33:40 UTC (rev 2282)
@@ -311,7 +311,7 @@
self.updateThread.enqueue(up)
def closeCallback(self, conn, data):
- up = update.CloseCallback(conn, data)
+ up = update.CloseUpdate(conn, data)
self.updateThread.enqueue(up)
def controlCallback(self, conn, type, data):
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-08-12 16:44:48 UTC (rev 2281)
+++ mgmt/trunk/mint/python/mint/update.py 2008-08-12 17:33:40 UTC (rev 2282)
@@ -273,10 +273,10 @@
model.lock()
try:
- del model.connections[conn.id]
+ del model.connections[self.conn.id]
if model.connCloseListener:
- model.connCloseListener(conn, data)
+ model.connCloseListener(self.conn, self.data)
finally:
model.unlock()
17 years, 8 months
rhmessaging commits: r2281 - store/branches/mrg-1.0/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-08-12 12:44:48 -0400 (Tue, 12 Aug 2008)
New Revision: 2281
Modified:
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
Log:
Removed some forgotten but commented out debug statements
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-12 15:31:06 UTC (rev 2280)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-12 16:44:48 UTC (rev 2281)
@@ -39,8 +39,6 @@
#include "jrnl/jerrno.hpp"
#include <sstream>
-//#include <iostream> // debug
-
namespace rhm
{
namespace journal
@@ -577,7 +575,6 @@
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_commit_busy = true;
}
-//std::cout << " * commit, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
bool done = false;
while (!done)
{
@@ -608,12 +605,9 @@
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
-//{ std::cout << " * commit enq - add enq rid=0x" << std::hex << itr->_rid << std::dec << std::endl << std::flush;
_emap.insert_fid(itr->_rid, itr->_fid);
-//}
else // txn dequeue
{
-//std::cout << " * commit deq - remove enq rid=0x" << std::hex << itr->_drid << std::dec << std::endl << std::flush;
u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
17 years, 8 months
rhmessaging commits: r2280 - store/branches/java/broker-queue-refactor/java/bdbstore/lib.
by rhmessaging-commits@lists.jboss.org
Author: marnie(a)apache.org
Date: 2008-08-12 11:31:06 -0400 (Tue, 12 Aug 2008)
New Revision: 2280
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/lib/backport-util-concurrent-2.2.jar
Log:
Added: store/branches/java/broker-queue-refactor/java/bdbstore/lib/backport-util-concurrent-2.2.jar
===================================================================
(Binary files differ)
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/lib/backport-util-concurrent-2.2.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
17 years, 8 months
rhmessaging commits: r2279 - in store/branches/mrg-1.0/cpp: lib/jrnl and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-08-12 11:27:18 -0400 (Tue, 12 Aug 2008)
New Revision: 2279
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
store/branches/mrg-1.0/cpp/tests/persistence.py
Log:
Fix for BZ458053: "txtest failures when broker killed during transfer phase". Modified message recovery to correctly predict outcome of to-be-rolled-forward/back transactions. Access to jcntl::_emap was required for this, so some accessers were added to class jcntl. Includes fix to python file check program jfile_chk.py which incorrectly detected owi on last-to-first file transition and message content overflowed.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -52,10 +52,12 @@
BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
- const bool _commit_flag) :
+ const bool _commit_flag,
+ const bool _tpc_flag) :
rid(_rid),
deq_flag(_deq_flag),
- commit_flag(_commit_flag)
+ commit_flag(_commit_flag),
+ tpc_flag(_tpc_flag)
{}
BdbMessageStore::BdbMessageStore(const char* envpath) :
@@ -347,7 +349,7 @@
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"), defJournalGetEventsTimeout,
+ std::string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
@@ -521,36 +523,61 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
- std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ std::string xid = i->xid;
- tpcc->prepare(tplStorePtr.get());
-
// Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- tpcc->recoverDtok(citr->second.rid, i->xid);
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = citr->second.deq_flag;
- RecoverableTransaction::shared_ptr dtx;
- if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
- if (i->enqueues.get()) {
- for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ if (citr->second.tpc_flag) {
+ // Dtx (2PC) transaction
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->prepare(tplStorePtr.get());
+
+ RecoverableTransaction::shared_ptr dtx;
+ if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ }
}
- }
- if (i->dequeues.get()) {
- for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ }
}
+
+ if (incomplTplTxnFlag) {
+ tpcc->complete(citr->second.commit_flag);
+ }
+ } else {
+ // Local (1PC) transaction
+ boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
+ opcc->recoverDtok(citr->second.rid, xid);
+ opcc->prepare(tplStorePtr.get());
+
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (incomplTplTxnFlag) {
+ opcc->complete(citr->second.commit_flag);
+ }
}
-
- if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
}
registry.recoveryComplete();
}
@@ -580,7 +607,7 @@
JournalImpl* jQueue = 0;
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
@@ -649,8 +676,8 @@
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
- string queueName;
- string routingkey;
+ std::string queueName;
+ std::string routingkey;
FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
@@ -695,6 +722,7 @@
txn_list& prepared,
message_index& messages)
{
+//std::cout << "***** recoverMessages(): queue=" << queue->getName() << std::endl;
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -748,15 +776,36 @@
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
- if (i == prepared.end()) { // not locked
+ if (i == prepared.end()) { // not in prepared list
queue->recover(msg);
} else {
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ u_int64_t rid = dtokp.rid();
+ std::string xid(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
- if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
+ if (jc->is_enqueued(rid, true)) {
+ // Enqueue is non-tx, dequeue tx
+ assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
+ if (!citr->second.commit_flag) {
+ queue->recover(msg); // recover message in abort case only
+ }
+ } else {
+ // Enqueue and/or dequeue tx
+ journal::txn_map& tmap = jc->get_txn_map();
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ bool enq = false;
+ bool deq = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag && j->_rid == rid) enq = true;
+ else if (!j->_enq_flag && j->_drid == rid) deq = true;
+ }
+ if (enq && !deq && citr->second.commit_flag) {
+ queue->recover(msg); // recover txn message in commit case only
+ }
+ }
} else {
- messages[dtokp.rid()] = msg;
+ messages[rid] = msg;
}
}
@@ -818,7 +867,7 @@
txn.commit();
} catch (const DbException& e) {
txn.abort();
- THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+ THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + std::string(e.what()));
} catch (...) {
txn.abort();
throw;
@@ -857,54 +906,81 @@
return count;
}
-void BdbMessageStore::recoverTplStore()
+void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- tplStorePtr->recover_complete(); // start journal.
- }
-}
-void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
-{
- if (tplStorePtr.get()) {
- if (!tplStorePtr->is_ready())
- recoverTplStore();
+ DataTokenImpl dtokp;
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ bool done = false;
+ try {
+ unsigned aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ // Every TPL record contains both data and an XID
+ assert(dbuffSize>0);
+ assert(xidbuffSize>0);
+ std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+ bool is2PC = *(static_cast<char*>(dbuff)) != 0;
- // TODO: The journal will return a const txn_map and the txn_map will support
- // const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = tplStorePtr->get_txn_map();
- std::vector<std::string> xidList;
- tmap.xid_list(xidList);
- for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
- journal::txn_data_list txnList = tmap.get_tdata_list(*i);
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- u_int64_t rid = 0;
- bool commitFlag = false;
- for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag) {
- rid = j->_rid;
- enqCnt++;
- } else {
- commitFlag = j->_commit_flag;
- deqCnt++;
- }
+ // Check transaction details; add to recover map
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+ bool commitFlag = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+ ::free(xidbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+ default:
+ assert("Store Error: Unexpected msg state");
+ } // switch
}
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
+
+ tplStorePtr->recover_complete(); // start journal.
}
}
void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -916,12 +992,13 @@
}
}
-void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
+void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+ // Discard all txns that are to be rolled forward/back and 1PC transactions
+ if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
}
@@ -994,7 +1071,7 @@
try {
int status = db.get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
- THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
}
} catch (const DbMemoryException& expected) {
//api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
@@ -1230,7 +1307,7 @@
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
- string tid;
+ std::string tid;
if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
@@ -1339,7 +1416,8 @@
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ char tpcFlag = static_cast<char>(ctxt->isTPC());
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
@@ -1409,7 +1487,7 @@
} else if (status == DB_NOTFOUND) {
return false;
} else {
- THROW_STORE_EXCEPTION("Deletion failed: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Deletion failed: " + std::string(DbEnv::strerror(status)));
}
}
@@ -1489,33 +1567,33 @@
}
}
-string BdbMessageStore::getJrnlBaseDir()
+std::string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+std::string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getTplBaseDir()
+std::string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
-string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
}
-string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
{
std::stringstream dir;
dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-08-12 15:27:18 UTC (rev 2279)
@@ -63,10 +63,11 @@
// Structs for Transaction Recover List (TPL) recover state
struct TplRecoverStruct {
- u_int64_t rid;
+ u_int64_t rid; // rid of TPL record
bool deq_flag;
bool commit_flag;
- TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ bool tpc_flag;
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
};
typedef TplRecoverStruct TplRecover;
typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -157,8 +158,7 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverTplStore();
- void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverTplStore(TplRecoverMap& tplMap);
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -199,7 +199,7 @@
}
}
std::ostringstream oss2;
- oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
+ oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
@@ -219,7 +219,7 @@
JournalImpl::recover_complete()
{
jcntl::recover_complete();
- log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+ log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
}
#define MAX_AIO_SLEEPS 1000 // 10 sec
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-08-12 15:27:18 UTC (rev 2279)
@@ -86,7 +86,7 @@
try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- if (isTPC()) sync();
+ sync();
} else {
jc->txn_abort(dtokp.get(), getXid());
}
@@ -106,6 +106,8 @@
}
}
+ TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
/**
* Call to make sure all the data for this txn is written to safe store
*
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -130,16 +130,13 @@
}
bool
-enq_map::is_enqueued(const u_int64_t rid)
+enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
{
- emap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(rid);
- }
+ slock s(&_mutex);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
return false;
- if (itr->second.second) // locked
+ if (!ignore_lock && itr->second.second) // locked
return false;
return true;
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -95,7 +95,7 @@
void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
u_int16_t get_fid(const u_int64_t rid);
u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
- bool is_enqueued(const u_int64_t rid);
+ bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
void lock(const u_int64_t rid);
void unlock(const u_int64_t rid);
bool is_locked(const u_int64_t rid);
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -557,8 +557,15 @@
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const u_int64_t rid) { return _emap.is_enqueued(rid); }
-
+ inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+ { return _emap.is_enqueued(rid, ignore_lock); }
+ inline bool is_locked(const u_int64_t rid)
+ { if (_emap.is_enqueued(rid, true)) return _emap.is_locked(rid); return false; }
+ inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+ inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
+
/**
* \brief Check if the journal is stopped.
*
@@ -607,8 +614,6 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- // TODO Make this a const, but txn_map must support const first.
- inline txn_map& get_txn_map() { return _tmap; }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -94,6 +94,7 @@
const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
const u_int32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
const u_int32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
+const u_int32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
// class rmgr
const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
@@ -187,7 +188,9 @@
"Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
_err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: "
"Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
+ _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
+
// class rmgr
_err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
_err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: "
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -112,6 +112,7 @@
static const u_int32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state.
static const u_int32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
static const u_int32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
+ static const u_int32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
// class rmgr
static const u_int32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -39,6 +39,8 @@
#include "jrnl/jerrno.hpp"
#include <sstream>
+//#include <iostream> // debug
+
namespace rhm
{
namespace journal
@@ -575,6 +577,7 @@
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_commit_busy = true;
}
+//std::cout << " * commit, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
bool done = false;
while (!done)
{
@@ -605,9 +608,12 @@
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
+//{ std::cout << " * commit enq - add enq rid=0x" << std::hex << itr->_rid << std::dec << std::endl << std::flush;
_emap.insert_fid(itr->_rid, itr->_fid);
+//}
else // txn dequeue
{
+//std::cout << " * commit deq - remove enq rid=0x" << std::hex << itr->_drid << std::dec << std::endl << std::flush;
u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
@@ -834,6 +840,7 @@
#endif
break;
case data_tok::COMMIT_SUBM:
+//std::cout << " * commit-ret, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
@@ -1018,13 +1025,34 @@
wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
{
// First check emap
- try { _emap.get_fid(drid); }
+ bool found = false;
+ try
+ {
+ _emap.get_fid(drid);
+ found = true;
+ }
catch(const jexception& e)
{
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw;
- _tmap.get_data(xid, drid); // not in emap, try tmap
+ if (xid.size())
+ try
+ {
+ _tmap.get_data(xid, drid); // not in emap, try tmap
+ found = true;
+ }
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
+ if (!found)
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+ throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+ }
}
void
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -408,7 +408,7 @@
u_int64_t rid = enq_msg(jc, 0, create_msg(msg, 0, MSG_SIZE), false);
deq_msg(jc, rid);
try{ deq_msg(jc, rid); BOOST_ERROR("Did not throw exception on second dequeue."); }
- catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+ catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_WMGR_DEQRIDNOTENQ); }
rid = enq_msg(jc, 1, create_msg(msg, 1, MSG_SIZE), false);
deq_msg(jc, rid);
}
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -87,9 +87,9 @@
try
{
deq_msg(jc, m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
@@ -138,9 +138,9 @@
try
{
deq_msg(jc, 3*m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-12 15:27:18 UTC (rev 2279)
@@ -445,6 +445,7 @@
self.first_rec = False
self.last_file = False
self.last_rid = -1
+ self.fhdr_owi_at_msg_start = None
self.proc_args(argv)
self.proc_csv()
@@ -474,6 +475,7 @@
stop = True;
else:
self.rec_cnt += 1
+ self.fhdr_owi_at_msg_start = self.fhdr.owi()
if self.first_rec:
if self.fhdr.fro != hdr.foffs:
raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -656,7 +658,7 @@
return self.file_num
def check_owi(self, hdr):
- return self.fhdr.owi() == hdr.owi()
+ return self.fhdr_owi_at_msg_start == hdr.owi()
def check_rid(self, hdr):
if self.last_rid != -1 and hdr.rid <= self.last_rid:
Modified: store/branches/mrg-1.0/cpp/tests/persistence.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/persistence.py 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/persistence.py 2008-08-12 15:27:18 UTC (rev 2279)
@@ -231,7 +231,7 @@
if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
if txd.global_id not in ids:
- self.fail("Recovered xids not as expected. missing: %s" % (txc))
+ self.fail("Recovered xids not as expected. missing: %s" % (txd))
self.assertEqual(2, len(xids))
17 years, 8 months
rhmessaging commits: r2278 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-08-11 18:07:32 -0400 (Mon, 11 Aug 2008)
New Revision: 2278
Modified:
mgmt/trunk/mint/python/mint/schemaparser.py
Log:
handle double and float types in schema
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-08-11 21:59:12 UTC (rev 2277)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-08-11 22:07:32 UTC (rev 2278)
@@ -22,6 +22,7 @@
self.dataTypesMap["uint16"] = self.dataTypesMap["hilo16"] = self.dataTypesMap["count16"] = self.dataTypesMap["mma16"] = "SmallIntCol"
self.dataTypesMap["uint32"] = self.dataTypesMap["hilo32"] = self.dataTypesMap["count32"] = self.dataTypesMap["mma32"] = self.dataTypesMap["atomic32"] = "IntCol"
self.dataTypesMap["uint64"] = self.dataTypesMap["hilo64"] = self.dataTypesMap["count64"] = self.dataTypesMap["mma64"] = self.dataTypesMap["mmaTime"] = "BigIntCol"
+ self.dataTypesMap["float"] = self.dataTypesMap["double"] = "FloatCol"
self.dataTypesMap["absTime"] = self.dataTypesMap["deltaTime"] = "BigIntCol"
self.dataTypesMap["bool"] = "BoolCol"
self.dataTypesMap["sstr"] = self.dataTypesMap["lstr"] = "StringCol"
17 years, 8 months