rhmessaging commits: r3934 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-04-26 16:11:31 -0400 (Mon, 26 Apr 2010)
New Revision: 3934
Modified:
mgmt/newdata/cumin/python/cumin/objectselector.py
Log:
Added ObjectTable class
Modified: mgmt/newdata/cumin/python/cumin/objectselector.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectselector.py 2010-04-26 20:07:04 UTC (rev 3933)
+++ mgmt/newdata/cumin/python/cumin/objectselector.py 2010-04-26 20:11:31 UTC (rev 3934)
@@ -10,7 +10,7 @@
strings = StringCatalog(__file__)
-class ObjectSelector(DataTable, Form):
+class ObjectTable(DataTable):
def __init__(self, app, name, cls, adapter=None):
assert isinstance(cls, RosemaryClass), cls
@@ -19,50 +19,22 @@
assert isinstance(adapter, ObjectSqlAdapter), adapter
- super(ObjectSelector, self).__init__(app, name, adapter)
+ super(ObjectTable, self).__init__(app, name, adapter)
self.cls = cls
self.update_enabled = True
- item = IntegerParameter(app, "item")
-
- self.ids = ListParameter(app, "id", item)
- self.add_parameter(self.ids)
-
- self.checkbox_column = ObjectCheckboxColumn \
- (app, "id", cls._id, self.ids)
- self.add_column(self.checkbox_column)
-
- self.switches = ObjectSelectorSwitches(app, "switches")
- self.add_child(self.switches)
-
- self.filters = ObjectSelectorFilters(app, "filters")
- self.add_child(self.filters)
-
- self.buttons = ObjectSelectorButtons(app, "buttons")
- self.add_child(self.buttons)
-
# (RosemaryAttribute this, RosemaryAttribute that, Attribute object)
self.filter_specs = list()
- self.tasks = list()
-
def init(self):
- super(ObjectSelector, self).init()
+ super(ObjectTable, self).init()
assert self.cls, self
assert self.adapter, self
assert self.adapter.id_field, self
- for task in self.tasks:
- task.init()
-
- for task in self.tasks:
- button = SelectionTaskButton(self.app, task)
- self.buttons.add_child(button)
- button.init()
-
def add_attribute_column(self, attr):
assert isinstance(attr, RosemaryAttribute), attr
@@ -101,6 +73,41 @@
def render_title(self, session):
return "%ss" % self.cls._title
+class ObjectSelector(ObjectTable, Form):
+ def __init__(self, app, name, cls, adapter=None):
+ super(ObjectSelector, self).__init__(app, name, cls, adapter)
+
+ item = IntegerParameter(app, "item")
+
+ self.ids = ListParameter(app, "selection", item)
+ self.add_parameter(self.ids)
+
+ self.checkbox_column = ObjectCheckboxColumn \
+ (app, "id", cls._id, self.ids)
+ self.add_column(self.checkbox_column)
+
+ self.switches = ObjectSelectorSwitches(app, "switches")
+ self.add_child(self.switches)
+
+ self.filters = ObjectSelectorFilters(app, "filters")
+ self.add_child(self.filters)
+
+ self.buttons = ObjectSelectorButtons(app, "buttons")
+ self.add_child(self.buttons)
+
+ self.tasks = list()
+
+ def init(self):
+ super(ObjectSelector, self).init()
+
+ for task in self.tasks:
+ task.init()
+
+ for task in self.tasks:
+ button = SelectionTaskButton(self.app, task)
+ self.buttons.add_child(button)
+ button.init()
+
class ObjectAttributeColumn(DataTableColumn):
def __init__(self, app, name, attr):
super(ObjectAttributeColumn, self).__init__(app, name, None)
14 years, 8 months
rhmessaging commits: r3933 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-04-26 16:07:04 -0400 (Mon, 26 Apr 2010)
New Revision: 3933
Modified:
mgmt/newdata/cumin/python/cumin/objecttask.py
Log:
Add title to SelectionTaskForms
Modified: mgmt/newdata/cumin/python/cumin/objecttask.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objecttask.py 2010-04-26 20:05:26 UTC (rev 3932)
+++ mgmt/newdata/cumin/python/cumin/objecttask.py 2010-04-26 20:07:04 UTC (rev 3933)
@@ -319,6 +319,9 @@
self.task.invoke(session, selection)
self.task.exit_with_redirect(session)
+ def render_title(self, session):
+ return self.task.get_title(session)
+
class ObjectTaskLink(Link):
def __init__(self, app, name, task):
assert isinstance(task, ObjectTask), task
14 years, 8 months
rhmessaging commits: r3932 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-04-26 16:05:26 -0400 (Mon, 26 Apr 2010)
New Revision: 3932
Modified:
mgmt/newdata/cumin/python/cumin/sqladapter.py
Log:
whitespace removal
Modified: mgmt/newdata/cumin/python/cumin/sqladapter.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/sqladapter.py 2010-04-26 19:57:05 UTC (rev 3931)
+++ mgmt/newdata/cumin/python/cumin/sqladapter.py 2010-04-26 20:05:26 UTC (rev 3932)
@@ -9,7 +9,7 @@
self.app = app
self.table = table
-
+
self.query = SqlQuery(self.table)
self.columns = list()
@@ -25,7 +25,7 @@
try:
self.query.execute(cursor, ("count(1)",), values)
-
+
return cursor.fetchone()[0]
finally:
cursor.close()
14 years, 8 months
rhmessaging commits: r3931 - store/trunk/cpp/tools.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-04-26 15:57:05 -0400 (Mon, 26 Apr 2010)
New Revision: 3931
Added:
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/jerr.py
Modified:
store/trunk/cpp/tools/Makefile.am
store/trunk/cpp/tools/jrnl.py
store/trunk/cpp/tools/resize
store/trunk/cpp/tools/store_chk
Log:
Refactor and tidy-up of resize and store-chk python tools
Modified: store/trunk/cpp/tools/Makefile.am
===================================================================
--- store/trunk/cpp/tools/Makefile.am 2010-04-26 18:31:43 UTC (rev 3930)
+++ store/trunk/cpp/tools/Makefile.am 2010-04-26 19:57:05 UTC (rev 3931)
@@ -23,7 +23,9 @@
qpidexec_SCRIPTS = jrnl.py resize store_chk
EXTRA_DIST = \
+ jerr.py \
jrnl.py \
+ janal.py \
resize \
store_chk
\ No newline at end of file
Added: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py (rev 0)
+++ store/trunk/cpp/tools/janal.py 2010-04-26 19:57:05 UTC (rev 3931)
@@ -0,0 +1,595 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr, jrnl
+import os.path, sys
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+ """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock"""
+
+ def __init__(self):
+ """Constructor"""
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr, lock = False):
+ """Add a new record into the map"""
+ if hdr.rid in self.__map.keys():
+ raise jerr.DuplicateRidError(hdr.rid)
+ self.__map[hdr.rid] = (fid, hdr, lock)
+
+ def contains(self, rid):
+ """Return True if the map contains the given rid"""
+ return rid in self.__map.keys()
+
+ def delete(self, rid):
+ """Delete the rid and its associated data from the map"""
+ if rid in self.__map.keys():
+ if self.get_lock(rid):
+ raise jerr.DeleteLockedRecordError(rid)
+ del self.__map[rid]
+ else:
+ raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
+
+ def get(self, rid):
+ """Return a tuple (fid, hdr, lock) for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid]
+ return None
+
+ def get_fid(self, rid):
+ """Return the fid for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][0]
+ return None
+
+ def get_hdr(self, rid):
+ """Return the header record for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][1]
+ return None
+
+ def get_lock(self, rid):
+ """Return the transaction lock value for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][2]
+ return None
+
+ def get_rec_list(self):
+ """Return a list of tuples (fid, hdr, lock) for all entries in the map"""
+ return self.__map.values()
+
+ def lock(self, rid):
+ """Set the transaction lock for a given rid to True"""
+ if rid in self.__map.keys():
+ tup = self.__map[rid]
+ if not tup[2]:
+ self.__map[rid] = (tup[0], tup[1], True)
+ else:
+ raise jerr.AlreadyLockedError(rid)
+ else:
+ raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No enqueued records found."
+ rstr = "%d enqueued records found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.__map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ rec = self.__map[rid]
+ if rec[2]:
+ lock_str = " [LOCKED]"
+ else:
+ lock_str = ""
+ rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
+ else:
+ rstr += "."
+ return rstr
+
+ def rids(self):
+ """Return a list of rids in the map"""
+ return self.__map.keys()
+
+ def size(self):
+ """Return the number of entries in the map"""
+ return len(self.__map)
+
+ def unlock(self, rid):
+ """Set the transaction lock for a given rid to False"""
+ if rid in self.__map.keys():
+ tup = self.__map[rid]
+ if tup[2]:
+ self.__map[rid] = (tup[0], tup[1], False)
+ else:
+ raise jerr.NotLockedError(rid)
+ else:
+ raise jerr.NonExistentRecordError("unlock", rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+ """Transaction map, which maps xids to a list of outstanding actions"""
+
+ def __init__(self, emap):
+ """Constructor, requires an existing EnqMap instance"""
+ self.__emap = emap
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ """Add a new transactional record into the map"""
+ if isinstance(hdr, jrnl.DeqRec):
+ self.__emap.lock(hdr.deq_rid)
+ if hdr.xid in self.__map.keys():
+ self.__map[hdr.xid].append((fid, hdr)) # append to existing list
+ else:
+ self.__map[hdr.xid] = [(fid, hdr)] # create new list
+
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False otherwise"""
+ return xid in self.__map.keys()
+
+ def delete(self, hdr):
+ """Remove a transaction record from the map using either a commit or abort header"""
+ if hdr.magic[-1] == "c":
+ return self._commit(hdr.xid)
+ if hdr.magic[-1] == "a":
+ self._abort(hdr.xid)
+ else:
+ raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid)
+
+ def get(self, xid):
+ """Return a list of operations for the given xid"""
+ if self.contains(xid):
+ return self.__map[xid]
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No outstanding transactions found."
+ rstr = "%d outstanding transactions found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ for xid, tup in self.__map.iteritems():
+ rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
+ for i in tup:
+ rstr += "\n %s" % str(i[1])
+ else:
+ rstr += "."
+ return rstr
+
+ def size(self):
+ """Return the number of xids in the map"""
+ return len(self.__map)
+
+ def xids(self):
+ """Return a list of xids in the map"""
+ return self.__map.keys()
+
+ def _abort(self, xid):
+ """Perform an abort operation for the given xid record"""
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, jrnl.DeqRec):
+ self.__emap.unlock(hdr.rid)
+ del self.__map[xid]
+
+ def _commit(self, xid):
+ """Perform a commit operation for the given xid record"""
+ mismatch_list = []
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, jrnl.EnqRec):
+ self.__emap.add(fid, hdr) # Transfer enq to emap
+ else:
+ if self.__emap.contains(hdr.deq_rid):
+ self.__emap.unlock(hdr.deq_rid)
+ self.__emap.delete(hdr.deq_rid)
+ else:
+ mismatch_list.append("0x%x" % hdr.deq_rid)
+ del self.__map[xid]
+ return mismatch_list
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+ """
+ This class analyzes a set of journal files and determines which is the last to be written
+ (the newest file), and hence which should be the first to be read for recovery (the oldest
+ file).
+
+ The analysis is performed on construction; the contents of the JrnlInfo object passed provide
+ the recovery details.
+ """
+
+ def __init__(self, jinf):
+ """Constructor"""
+ self.__oldest = None
+ self.__jinf = jinf
+ self.__flist = self._analyze()
+
+ def __str__(self):
+ """String representation of this JrnlAnalyzer instance, will print out results of analysis."""
+ ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir()
+ if self.is_empty():
+ ostr += " <All journal files are empty>\n"
+ else:
+ for tup in self.__flist:
+ tmp = " "
+ if tup[0] == self.__oldest[0]:
+ tmp = "*"
+ ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2],
+ tup[3], tup[4], tup[5])
+ for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
+ ostr += " %s.%04d.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i)
+ return ostr
+
+ # Analysis
+
+ def get_oldest_file(self):
+ """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal"""
+ return self.__oldest
+
+ def get_oldest_file_index(self):
+ """Return the ordinal number of the oldest data file found in the journal"""
+ if self.is_empty():
+ return None
+ return self.__oldest[0]
+
+ def is_empty(self):
+ """Return true if the analysis found that the journal file has never been written to"""
+ return len(self.__flist) == 0
+
+ def _analyze(self):
+ """Perform the journal file analysis by reading and comparing the file headers of each journal data file"""
+ owi_found = False
+ flist = []
+ for i in range(0, self.__jinf.get_num_jrnl_files()):
+ jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i))
+ fhandle = open(jfn)
+ fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
+ if fhdr.empty():
+ break
+ this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ flist.append(this_tup)
+ if i == 0:
+ init_owi = fhdr.owi()
+ self.__oldest = this_tup
+ elif fhdr.owi() != init_owi and not owi_found:
+ self.__oldest = this_tup
+ owi_found = True
+ return flist
+
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+ """
+ This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
+ object list (txn_obj_list) which are populated by reading the journals from the oldest
+ to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+ objects supplied on construction provide the information used for the recovery.
+
+ The analysis is performed on construction.
+ """
+
+ def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
+ """Constructor, which reads all """
+ self._jinfo = jinfo
+ self._jra = jra
+ self._qflag = qflag
+ self._rflag = rflag
+ self._vflag = vflag
+
+ # test callback functions for CSV tests
+ self._csv_store_chk = None
+ self._csv_start_cb = None
+ self._csv_enq_cb = None
+ self._csv_deq_cb = None
+ self._csv_txn_cb = None
+ self._csv_end_cb = None
+
+ self._emap = EnqMap()
+ self._tmap = TxnMap(self._emap)
+ self._txn_obj_list = {}
+
+ self._file = None
+ self._file_hdr = None
+ self._file_num = None
+ self._first_rec_flag = None
+ self._fro = None
+ self._last_file_flag = None
+ self._start_file_num = None
+ self._file_hdr_owi = None
+ self._warning = []
+
+ self._abort_cnt = 0
+ self._commit_cnt = 0
+ self._msg_cnt = 0
+ self._rec_cnt = 0
+ self._txn_msg_cnt = 0
+
+ def __str__(self):
+ """Print out all the undequeued records"""
+ return self.report(True, self._rflag)
+
+ def emap(self):
+ """Get the enqueue map"""
+ return self._emap
+
+ def get_abort_cnt(self):
+ """Get the cumulative number of transactional aborts found"""
+ return self._abort_cnt
+
+ def get_commit_cnt(self):
+ """Get the cumulative number of transactional commits found"""
+ return self._commit_cnt
+
+ def get_msg_cnt(self):
+ """Get the cumulative number of messages found"""
+ return self._msg_cnt
+
+ def get_rec_cnt(self):
+ """Get the cumulative number of journal records (including fillers) found"""
+ return self._rec_cnt
+
+ def is_last_file(self):
+ """Return True if the last file is being read"""
+ return self._last_file_flag
+
+ def report(self, show_stats = True, show_records = False):
+ """Return a string containing a report on the file analysis"""
+ rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records)
+ #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold
+ return rstr
+
+ def run(self):
+ """Perform the read of the journal"""
+ if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
+ return
+ if self._jra.is_empty():
+ return
+ stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
+ while not stop and not self._get_next_record():
+ pass
+ if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
+ return
+ if not self._qflag:
+ print
+
+ def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None,
+ csv_end_cb = None):
+ """Set callbacks for checks to be made at various points while reading the journal"""
+ self._csv_store_chk = csv_store_chk
+ self._csv_start_cb = csv_start_cb
+ self._csv_enq_cb = csv_enq_cb
+ self._csv_deq_cb = csv_deq_cb
+ self._csv_txn_cb = csv_txn_cb
+ self._csv_end_cb = csv_end_cb
+
+ def tmap(self):
+ """Return the transaction map"""
+ return self._tmap
+
+ def get_txn_msg_cnt(self):
+ """Get the cumulative transactional message count"""
+ return self._txn_msg_cnt
+
+ def txn_obj_list(self):
+ """Get a cululative list of transaction objects (commits and aborts)"""
+ return self._txn_obj_list
+
+ def _advance_jrnl_file(self, *oldest_file_info):
+ """Rotate to using the next journal file. Return False if the operation was sucessful, True if there are no
+ more files to read."""
+ fro_seek_flag = False
+ if len(oldest_file_info) > 0:
+ self._start_file_num = self._file_num = oldest_file_info[0]
+ self._fro = oldest_file_info[4]
+ fro_seek_flag = True # jump to fro to start reading
+ if not self._qflag and not self._rflag:
+ if self._vflag:
+ print "Recovering journals..."
+ else:
+ print "Recovering journals",
+ if self._file != None and self._is_file_full():
+ self._file.close()
+ self._file_num = self._incr_file_num()
+ if self._file_num == self._start_file_num:
+ return True
+ if self._start_file_num == 0:
+ self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1
+ else:
+ self._last_file_flag = self._file_num == self._start_file_num - 1
+ if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files():
+ raise jerr.BadFileNumberError(self._file_num)
+ jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
+ (self._jinfo.get_jrnl_base_name(), self._file_num))
+ self._file = open(jfn)
+ self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ if fro_seek_flag and self._file.tell() != self._fro:
+ self._file.seek(self._fro)
+ self._first_rec_flag = True
+ if not self._qflag:
+ if self._rflag:
+ print jfn, ": ", self._file_hdr
+ elif self._vflag:
+ print "* Reading %s" % jfn
+ else:
+ print ".",
+ sys.stdout.flush()
+ return False
+
+ def _check_owi(self, hdr):
+ """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
+ indicate wheher the last record in a file has been read and now older records which have not yet been
+ overwritten are now being read."""
+ return self._file_hdr_owi == hdr.owi()
+
+ def _is_file_full(self):
+ """Return True if the current file is full (no more write space); false otherwise"""
+ return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
+
+ def _get_next_record(self):
+ """Get the next record in the file for analysis"""
+ if self._is_file_full():
+ if self._advance_jrnl_file():
+ return True
+ try:
+ hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ except:
+ return True
+ if hdr.empty():
+ return True
+ if hdr.check():
+ return True
+ self._rec_cnt += 1
+ self._file_hdr_owi = self._file_hdr.owi()
+ if self._first_rec_flag:
+ if self._file_hdr.fro != hdr.foffs:
+ raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
+ else:
+ if self._rflag:
+ print " * fro ok: 0x%x" % self._file_hdr.fro
+ self._first_rec_flag = False
+ stop = False
+ if isinstance(hdr, jrnl.EnqRec):
+ stop = self._handle_enq_rec(hdr)
+ elif isinstance(hdr, jrnl.DeqRec):
+ stop = self._handle_deq_rec(hdr)
+ elif isinstance(hdr, jrnl.TxnRec):
+ stop = self._handle_txn_rec(hdr)
+ wstr = ""
+ for warn in self._warning:
+ wstr += " (%s)" % warn
+ if self._rflag:
+ print " > %s %s" % (hdr, wstr)
+ self._warning = []
+ return stop
+
+ def _handle_deq_rec(self, hdr):
+ """Process a dequeue ("RHMd") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
+ return True
+
+ try:
+ if hdr.xid == None:
+ self._emap.delete(hdr.deq_rid)
+ else:
+ self._tmap.add(self._file_hdr.fid, hdr)
+ except jerr.JWarning, warn:
+ self._warning.append(str(warn))
+ return False
+
+ def _handle_enq_rec(self, hdr):
+ """Process a dequeue ("RHMe") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check extern flag
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagDataError(hdr)
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.xid == None:
+ self._emap.add(self._file_hdr.fid, hdr)
+ else:
+ self._txn_msg_cnt += 1
+ self._tmap.add(self._file_hdr.fid, hdr)
+ self._msg_cnt += 1
+ return False
+
+ def _handle_txn_rec(self, hdr):
+ """Process a transaction ("RHMa or RHMc") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.magic[-1] == "a":
+ self._abort_cnt += 1
+ else:
+ self._commit_cnt += 1
+
+ if self._tmap.contains(hdr.xid):
+ mismatched_rids = self._tmap.delete(hdr)
+ if mismatched_rids != None and len(mismatched_rids) > 0:
+ self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
+ mismatched_rids)
+ else:
+ self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
+ if hdr.magic[-1] == "c": # commits only
+ self._txn_obj_list[hdr.xid] = hdr
+ return False
+
+ def _incr_file_num(self):
+ """Increment the number of files read with wraparound (ie after file n-1, go to 0)"""
+ self._file_num += 1
+ if self._file_num >= self._jinfo.get_num_jrnl_files():
+ self._file_num = 0
+ return self._file_num
+
+ def _load_rec(self, hdr):
+ """Load a single record for the given header. There may be arbitrarily large xids and data components."""
+ while not hdr.complete():
+ if self._advance_jrnl_file():
+ return True
+ hdr.load(self._file)
+ return False
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
Added: store/trunk/cpp/tools/jerr.py
===================================================================
--- store/trunk/cpp/tools/jerr.py (rev 0)
+++ store/trunk/cpp/tools/jerr.py 2010-04-26 19:57:05 UTC (rev 3931)
@@ -0,0 +1,217 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+# == Warnings =================================================================
+
+class JWarning(Exception):
+ """Class to convey a warning"""
+ def __init__(self, err):
+ """Constructor"""
+ Exception.__init__(self, err)
+
+# == Errors ===================================================================
+
+class AllJrnlFilesEmptyCsvError(Exception):
+ """All journal files are empty (never been written)"""
+ def __init__(self, tnum, exp_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." %
+ (tnum, exp_num_msgs))
+
+class AlreadyLockedError(Exception):
+ """Error class for trying to lock a record that is already locked"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid)
+
+class BadFileNumberError(Exception):
+ """Error class for incorrect or unexpected file number"""
+ def __init__(self, file_num):
+ """Constructor"""
+ Exception.__init__(self, "Bad file number %d" % file_num)
+
+class DataSizeError(Exception):
+ """Error class for data size mismatch"""
+ def __init__(self, exp_size, act_size, data_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" %
+ (exp_size, act_size, data_str))
+
+class DeleteLockedRecordError(Exception):
+ """Error class for deleting a locked record from the enqueue map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid)
+
+class DuplicateRidError(Exception):
+ """Error class for placing duplicate rid into enqueue map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid)
+
+class EndianMismatchError(Exception):
+ """Error class mismatched record header endian flag"""
+ def __init__(self, exp_endianness):
+ """Constructor"""
+ Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" %
+ self.endian_str(exp_endianness))
+ #@staticmethod
+ def endian_str(endianness):
+ """Return a string tuple for the endianness error message"""
+ if endianness:
+ return "big", "little"
+ return "little", "big"
+ endian_str = staticmethod(endian_str)
+
+class ExternFlagDataError(Exception):
+ """Error class for the extern flag being set and the internal size > 0"""
+ def __init__(self, hdr):
+ """Constructor"""
+ Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr)
+
+class ExternFlagCsvError(Exception):
+ """External flag mismatch between record and CSV test file"""
+ def __init__(self, tnum, exp_extern_flag):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag))
+
+class ExternFlagWithDataCsvError(Exception):
+ """External flag set and Message data found"""
+ def __init__(self, tnum):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum)
+
+class FillExceedsFileSizeError(Exception):
+ """Internal error from a fill operation which will exceed the specified file size"""
+ def __init__(self, cur_size, file_size):
+ """Constructor"""
+ Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size))
+
+class FillSizeError(Exception):
+ """Internal error from a fill operation that did not match the calculated end point in the file"""
+ def __init__(self, cur_posn, exp_posn):
+ """Constructor"""
+ Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn))
+
+class FirstRecordOffsetMismatch(Exception):
+ """Error class for file header fro mismatch with actual record"""
+ def __init__(self, fro, actual_offs):
+ """Constructor"""
+ Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" %
+ (fro, actual_offs))
+
+class InvalidHeaderVersionError(Exception):
+ """Error class for invalid record header version"""
+ def __init__(self, exp_ver, act_ver):
+ """Constructor"""
+ Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver))
+
+class InvalidRecordTypeError(Exception):
+ """Error class for any operation using an invalid record type"""
+ def __init__(self, operation, magic, rid):
+ """Constructor"""
+ Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" %
+ (operation, magic, rid))
+
+class InvalidRecordTailError(Exception):
+ """Error class for invalid record tail"""
+ def __init__(self, magic_err, rid_err, rec):
+ """Constructor"""
+ Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err)))
+ #@staticmethod
+ def tail_err_str(magic_err, rid_err):
+ """Return a string indicating the tail record error(s)"""
+ estr = ""
+ if magic_err:
+ estr = "magic bad"
+ if rid_err:
+ estr += ", "
+ if rid_err:
+ estr += "rid mismatch"
+ return estr
+ tail_err_str = staticmethod(tail_err_str)
+
+class NonExistentRecordError(Exception):
+ """Error class for any operation on an non-existent record"""
+ def __init__(self, operation, rid):
+ """Constructor"""
+ Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid))
+
+class NotLockedError(Exception):
+ """Error class for unlocking a record which is not locked in the first place"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid)
+
+class JournalSpaceExceededError(Exception):
+ """Error class for when journal space of resized journal is too small to contain the transferred records"""
+ def __init__(self):
+ """Constructor"""
+ Exception.__init__(self, "Ran out of journal space while writing records")
+
+class MessageLengthCsvError(Exception):
+ """Message length mismatch between record and CSV test file"""
+ def __init__(self, tnum, exp_msg_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" %
+ (tnum, exp_msg_len, actual_msg_len))
+
+class NumMsgsCsvError(Exception):
+ """Number of messages found mismatched with CSV file"""
+ def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" %
+ (tnum, exp_num_msgs, actual_num_msgs))
+
+class TransactionCsvError(Exception):
+ """Transaction mismatch between record and CSV file"""
+ def __init__(self, tnum, exp_transactional):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional))
+
+class UnexpectedEndOfFileError(Exception):
+ """Error class for unexpected end-of-file during reading"""
+ def __init__(self, exp_size, curr_offs):
+ """Constructor"""
+ Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" %
+ (exp_size, curr_offs))
+
+class XidLengthCsvError(Exception):
+ """Message Xid length mismatch between record and CSV file"""
+ def __init__(self, tnum, exp_xid_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" %
+ (tnum, exp_xid_len, actual_msg_len))
+
+class XidSizeError(Exception):
+ """Error class for Xid size mismatch"""
+ def __init__(self, exp_size, act_size, xid_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" %
+ (exp_size, act_size, xid_str))
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
+
Modified: store/trunk/cpp/tools/jrnl.py
===================================================================
--- store/trunk/cpp/tools/jrnl.py 2010-04-26 18:31:43 UTC (rev 3930)
+++ store/trunk/cpp/tools/jrnl.py 2010-04-26 19:57:05 UTC (rev 3931)
@@ -1,37 +1,43 @@
-# Copyright (c) 2007, 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr
import os.path, sys, xml.parsers.expat
from struct import pack, unpack, calcsize
from time import gmtime, strftime
# TODO: Get rid of these! Use jinf instance instead
-dblkSize = 128
-sblkSize = 4 * dblkSize
+DBLK_SIZE = 128
+SBLK_SIZE = 4 * DBLK_SIZE
-#== protected and private ======================================================================
+# TODO - this is messy - find a better way to handle this
+# This is a global, but is set directly by the calling program
+JRNL_FILE_SIZE = None
-_extern_mask = 0x20
+#== class Utils ======================================================================
class Utils(object):
+ """Class containing utility functions for dealing with the journal"""
__printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ "
@@ -40,164 +46,162 @@
# staticmethod() declaration.
#@staticmethod
- def formatData(dsize, data):
+ def format_data(dsize, data):
+ """Format binary data for printing"""
if data == None:
return ""
- if Utils.__isPrintable(data):
- datastr = Utils.__splitStr(data)
+ if Utils._is_printable(data):
+ datastr = Utils._split_str(data)
else:
- datastr = Utils.__hexSplitStr(data)
+ datastr = Utils._hex_split_str(data)
if dsize != len(data):
- raise Exception("Inconsistent data size: dsize=%d, data(%d)=\"%s\"" % (dsize, len(data), datastr))
+ raise jerr.DataSizeError(dsize, len(data), datastr)
return "data(%d)=\"%s\" " % (dsize, datastr)
- formatData = staticmethod(formatData)
+ format_data = staticmethod(format_data)
#@staticmethod
- def formatXid(xid, xidsize = None):
+ def format_xid(xid, xidsize=None):
+ """Format binary XID for printing"""
if xid == None and xidsize != None:
- if xidsize > 0: raise Exception("Inconsistent XID size: xidsize=%d, xid=None" % xidsize)
+ if xidsize > 0:
+ raise jerr.XidSizeError(xidsize, 0, None)
return ""
- if Utils.__isPrintable(xid):
- xidstr = Utils.__splitStr(xid)
+ if Utils._is_printable(xid):
+ xidstr = Utils._split_str(xid)
else:
- xidstr = Utils.__hexSplitStr(xid)
+ xidstr = Utils._hex_split_str(xid)
if xidsize == None:
xidsize = len(xid)
elif xidsize != len(xid):
- raise Exception("Inconsistent XID size: xidsize=%d, xid(%d)=\"%s\"" % (xidsize, len(xid), xidstr))
+ raise jerr.XidSizeError(xidsize, len(xid), xidstr)
return "xid(%d)=\"%s\" " % (xidsize, xidstr)
- formatXid = staticmethod(formatXid)
+ format_xid = staticmethod(format_xid)
#@staticmethod
- def invStr(s):
- si = ""
- for i in range(0,len(s)):
- si += chr(~ord(s[i]) & 0xff)
- return si
- invStr = staticmethod(invStr)
+ def inv_str(string):
+ """Perform a binary 1's compliment (invert all bits) on a binary string"""
+ istr = ""
+ for index in range(0, len(string)):
+ istr += chr(~ord(string[index]) & 0xff)
+ return istr
+ inv_str = staticmethod(inv_str)
#@staticmethod
- def load(f, klass):
- args = Utils.__loadArgs(f, klass)
+ def load(fhandle, klass):
+ """Load a record of class klass from a file"""
+ args = Utils._load_args(fhandle, klass)
subclass = klass.discriminate(args)
result = subclass(*args) # create instance of record
if subclass != klass:
- result.init(f, *Utils.__loadArgs(f, subclass))
- result.skip(f)
- return result;
+ result.init(fhandle, *Utils._load_args(fhandle, subclass))
+ result.skip(fhandle)
+ return result
load = staticmethod(load)
#@staticmethod
- def loadFileData(f, size, data):
+ def load_file_data(fhandle, size, data):
+ """Load the data portion of a message from file"""
if size == 0:
return (data, True)
if data == None:
loaded = 0
else:
loaded = len(data)
- foverflow = f.tell() + size - loaded > jfsize
+ foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
if foverflow:
- rsize = jfsize - f.tell()
+ rsize = JRNL_FILE_SIZE - fhandle.tell()
else:
rsize = size - loaded
- bin = f.read(rsize)
+ fbin = fhandle.read(rsize)
if data == None:
- data = unpack("%ds" % (rsize), bin)[0]
+ data = unpack("%ds" % (rsize), fbin)[0]
else:
- data = data + unpack("%ds" % (rsize), bin)[0]
+ data = data + unpack("%ds" % (rsize), fbin)[0]
return (data, not foverflow)
- loadFileData = staticmethod(loadFileData)
+ load_file_data = staticmethod(load_file_data)
#@staticmethod
- def remBytesInBlk(f, blkSize):
- foffs = f.tell()
- return Utils.sizeInBytesToBlk(foffs, blkSize) - foffs;
- remBytesInBlk = staticmethod(remBytesInBlk)
+ def rem_bytes_in_blk(fhandle, blk_size):
+ """Return the remaining bytes in a block"""
+ foffs = fhandle.tell()
+ return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
+ rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
#@staticmethod
- def sizeInBlks(size, blkSize):
- return int((size + blkSize - 1) / blkSize)
- sizeInBlks = staticmethod(sizeInBlks)
+ def size_in_blks(size, blk_size):
+ """Return the size in terms of data blocks"""
+ return int((size + blk_size - 1) / blk_size)
+ size_in_blks = staticmethod(size_in_blks)
#@staticmethod
- def sizeInBytesToBlk(size, blkSize):
- return Utils.sizeInBlks(size, blkSize) * blkSize
- sizeInBytesToBlk = staticmethod(sizeInBytesToBlk)
+ def size_in_bytes_to_blk(size, blk_size):
+ """Return the bytes remaining until the next block boundary"""
+ return Utils.size_in_blks(size, blk_size) * blk_size
+ size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
#@staticmethod
- def __hexSplitStr(s, splitSize = 50):
- if len(s) <= splitSize:
- return Utils.__hexStr(s, 0, len(s))
-# if len(s) > splitSize + 25:
-# return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s, 55, 65) + " ... " + Utils.__hexStr(s, len(s)-10, len(s))
- return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s, len(s)-10, len(s))
- __hexSplitStr = staticmethod(__hexSplitStr)
+ def _hex_split_str(in_str, split_size = 50):
+ """Split a hex string into two parts separated by an ellipsis"""
+ if len(in_str) <= split_size:
+ return Utils._hex_str(in_str, 0, len(in_str))
+# if len(in_str) > split_size + 25:
+# return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \
+# Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+ return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+ _hex_split_str = staticmethod(_hex_split_str)
#@staticmethod
- def __hexStr(s, b, e):
- o = ""
- for i in range(b, e):
- if Utils.__isPrintable(s[i]):
- o += s[i]
+ def _hex_str(in_str, begin, end):
+ """Return a binary string as a hex string"""
+ hstr = ""
+ for index in range(begin, end):
+ if Utils._is_printable(in_str[index]):
+ hstr += in_str[index]
else:
- o += "\\%02x" % ord(s[i])
- return o
- __hexStr = staticmethod(__hexStr)
+ hstr += "\\%02x" % ord(in_str[index])
+ return hstr
+ _hex_str = staticmethod(_hex_str)
#@staticmethod
- def __isPrintable(s):
- return s.strip(Utils.__printchars) == ""
- __isPrintable = staticmethod(__isPrintable)
+ def _is_printable(in_str):
+ """Return True if in_str in printable; False otherwise."""
+ return in_str.strip(Utils.__printchars) == ""
+ _is_printable = staticmethod(_is_printable)
#@staticmethod
- def __loadArgs(f, klass):
- size = calcsize(klass.format)
- foffs = f.tell(),
- bin = f.read(size)
- if len(bin) != size: raise Exception("End of file")
- return foffs + unpack(klass.format, bin)
- __loadArgs = staticmethod(__loadArgs)
+ def _load_args(fhandle, klass):
+ """Load the arguments from class klass"""
+ size = calcsize(klass.FORMAT)
+ foffs = fhandle.tell(),
+ fbin = fhandle.read(size)
+ if len(fbin) != size:
+ raise jerr.UnexpectedEndOfFileError(size, len(fbin))
+ return foffs + unpack(klass.FORMAT, fbin)
+ _load_args = staticmethod(_load_args)
#@staticmethod
- def __splitStr(s, splitSize = 50):
- if len(s) < splitSize:
- return s
- return s[:25] + " ... " + s[-25:]
- __splitStr = staticmethod(__splitStr)
+ def _split_str(in_str, split_size = 50):
+ """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
+ if len(in_str) < split_size:
+ return in_str
+ return in_str[:25] + " ... " + in_str[-25:]
+ _split_str = staticmethod(_split_str)
-#== class Warning =============================================================
-
-class Warning(Exception):
- def __init__(self, err):
- Exception.__init__(self, err)
-
-
-#== class Sizeable ============================================================
-
-class Sizeable(object):
-
- def size(self):
- classes = [self.__class__]
- size = 0
- while classes:
- cls = classes.pop()
- if hasattr(cls, "format"):
- size += calcsize(cls.format)
- classes.extend(cls.__bases__)
- return size
-
-
#== class Hdr =================================================================
-class Hdr(Sizeable):
+class Hdr:
+ """Class representing the journal header records"""
- format = "=4sBBHQ"
- hdrVer = 1
- owi_mask = 0x01
- big_endian_flag = sys.byteorder == "big"
+ FORMAT = "=4sBBHQ"
+ HDR_VER = 1
+ OWI_MASK = 0x01
+ BIG_ENDIAN = sys.byteorder == "big"
+ REC_BOUNDARY = DBLK_SIZE
def __init__(self, foffs, magic, ver, endn, flags, rid):
+ """Constructor"""
+# Sizeable.__init__(self)
self.foffs = foffs
self.magic = magic
self.ver = ver
@@ -206,85 +210,99 @@
self.rid = long(rid)
def __str__(self):
+ """Return string representation of this header"""
if self.empty():
return "0x%08x: <empty>" % (self.foffs)
if self.magic[-1] == "x":
return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]:
- return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn, self.flags, self.rid)
+ return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn,
+ self.flags, self.rid)
return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" % (self.foffs, self.magic)
#@staticmethod
def discriminate(args):
+ """Use the last char in the header magic to determine the header type"""
return _CLASSES.get(args[1][-1], Hdr)
discriminate = staticmethod(discriminate)
def empty(self):
+ """Return True if this record is empty (ie has a magic of 0x0000"""
return self.magic == "\x00"*4
def encode(self):
- return pack(Hdr.format, self.magic, self.ver, self.endn, self.flags, self.rid)
+ """Encode the header into a binary string"""
+ return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
def owi(self):
- return self.flags & self.owi_mask != 0
+ """Return the OWI (overwrite indicator) for this header"""
+ return self.flags & self.OWI_MASK != 0
- def skip(self, f):
- f.read(Utils.remBytesInBlk(f, dblkSize))
+ def skip(self, fhandle):
+ """Read and discard the remainder of this record"""
+ fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
def check(self):
+ """Check that this record is valid"""
if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]:
return True
if self.magic[-1] != "x":
- if self.ver != self.hdrVer:
- raise Exception("%s: Invalid header version: found %d, expected %d." % (self, self.ver, self.hdrVer))
- if bool(self.endn) != self.big_endian_flag:
- if self.big_endian_flag: e = "big"
- else: e = "little"
- raise Exception("Endian mismatch: this platform is %s and does not match record encoding (0x%04x)" % (e, self.endn))
+ if self.ver != self.HDR_VER:
+ raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
+ if bool(self.endn) != self.BIG_ENDIAN:
+ raise jerr.EndianMismatchError(self.BIG_ENDIAN)
return False
#== class FileHdr =============================================================
class FileHdr(Hdr):
+ """Class for file headers, found at the beginning of journal files"""
- format = "=2H4x3Q"
+ FORMAT = "=2H4x3Q"
+ REC_BOUNDARY = SBLK_SIZE
def __str__(self):
- return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro, self.timestamp_str())
+ """Return a string representation of the this FileHdr instance"""
+ return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro,
+ self.timestamp_str())
def encode(self):
- return Hdr.encode(self) + pack(FileHdr.format, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
- def init(self, f, foffs, fid, lid, fro, time_sec, time_ns):
+ def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
+ """Initialize this instance to known values"""
self.fid = fid
self.lid = lid
self.fro = fro
self.time_sec = time_sec
self.time_ns = time_ns
- def skip(self, f):
- f.read(Utils.remBytesInBlk(f, sblkSize))
-
def timestamp(self):
+ """Get the timestamp of this record as a tuple (secs, nsecs)"""
return (self.time_sec, self.time_ns)
def timestamp_str(self):
- ts = gmtime(self.time_sec)
+ """Get the timestamp of this record in string format"""
+ time = gmtime(self.time_sec)
fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
- return strftime(fstr, ts)
+ return strftime(fstr, time)
#== class DeqRec ==============================================================
class DeqRec(Hdr):
+ """Class for a dequeue record"""
- format = "=QQ"
+ FORMAT = "=QQ"
def __str__(self):
- return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize), self.deq_rid)
+ """Return a string representation of the this DeqRec instance"""
+ return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
- def init(self, f, foffs, deq_rid, xidsize):
+ def init(self, fhandle, foffs, deq_rid, xidsize):
+ """Initialize this instance to known values"""
self.deq_rid = deq_rid
self.xidsize = xidsize
self.xid = None
@@ -293,48 +311,56 @@
self.tail_complete = False
self.tail_bin = None
self.tail_offs = 0
- self.load(f)
+ self.load(fhandle)
def encode(self):
- d = Hdr.encode(self) + pack(DeqRec.format, self.deq_rid, self.xidsize)
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
if self.xidsize > 0:
fmt = "%ds" % (self.xidsize)
- d += pack(fmt, self.xid)
- d += self.deq_tail.encode()
- return d
+ buf += pack(fmt, self.xid)
+ buf += self.deq_tail.encode()
+ return buf
- def load(self, f):
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
if self.xidsize == 0:
self.xid_complete = True
self.tail_complete = True
else:
if not self.xid_complete:
- (self.xid, self.xid_complete) = Utils.loadFileData(f, self.xidsize, self.xid)
+ (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid)
if self.xid_complete and not self.tail_complete:
- ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
self.tail_bin = ret[0]
if ret[1]:
- self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
- if self.deq_tail.magic_inv != Utils.invStr(self.magic) or self.deq_tail.rid != self.rid:
- raise Exception(" > %s *INVALID TAIL RECORD*" % self)
- self.deq_tail.skip(f)
+ self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.deq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
self.tail_complete = ret[1]
return self.complete()
def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
return self.xid_complete and self.tail_complete
#== class TxnRec ==============================================================
class TxnRec(Hdr):
+ """Class for a transaction commit/abort record"""
- format = "=Q"
+ FORMAT = "=Q"
def __str__(self):
- return "%s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize))
+ """Return a string representation of the this TxnRec instance"""
+ return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize))
- def init(self, f, foffs, xidsize):
+ def init(self, fhandle, foffs, xidsize):
+ """Initialize this instance to known values"""
self.xidsize = xidsize
self.xid = None
self.tx_tail = None
@@ -342,57 +368,68 @@
self.tail_complete = False
self.tail_bin = None
self.tail_offs = 0
- self.load(f)
+ self.load(fhandle)
def encode(self):
- return Hdr.encode(self) + pack(TxnRec.format, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + self.tx_tail.encode()
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \
+ self.tx_tail.encode()
- def load(self, f):
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
if not self.xid_complete:
- ret = Utils.loadFileData(f, self.xidsize, self.xid)
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
self.xid = ret[0]
self.xid_complete = ret[1]
if self.xid_complete and not self.tail_complete:
- ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
self.tail_bin = ret[0]
if ret[1]:
- self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
- if self.tx_tail.magic_inv != Utils.invStr(self.magic) or self.tx_tail.rid != self.rid:
- raise Exception(" > %s *INVALID TAIL RECORD*" % self)
- self.tx_tail.skip(f)
+ self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.tx_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
self.tail_complete = ret[1]
return self.complete()
def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
return self.xid_complete and self.tail_complete
#== class EnqRec ==============================================================
class EnqRec(Hdr):
+ """Class for a enqueue record"""
- format = "=QQ"
- transient_mask = 0x10
- extern_mask = 0x20
+ FORMAT = "=QQ"
+ TRANSIENT_MASK = 0x10
+ EXTERN_MASK = 0x20
def __str__(self):
- return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize), Utils.formatData(self.dsize, self.data), self.enq_tail, self.print_flags())
+ """Return a string representation of the this EnqRec instance"""
+ return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize),
+ Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags())
def encode(self):
- d = Hdr.encode(self) + pack(EnqRec.format, self.xidsize, self.dsize)
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
if self.xidsize > 0:
- d += pack("%ds" % self.xidsize, self.xid)
+ buf += pack("%ds" % self.xidsize, self.xid)
if self.dsize > 0:
- d += pack("%ds" % self.dsize, self.data)
+ buf += pack("%ds" % self.dsize, self.data)
if self.xidsize > 0 or self.dsize > 0:
- d += self.enq_tail.encode()
- return d
+ buf += self.enq_tail.encode()
+ return buf
- def init(self, f, foffs, xidsize, dsize):
+ def init(self, fhandle, foffs, xidsize, dsize):
+ """Initialize this instance to known values"""
self.xidsize = xidsize
self.dsize = dsize
- self.transient = self.flags & self.transient_mask > 0
- self.extern = self.flags & self.extern_mask > 0
+ self.transient = self.flags & self.TRANSIENT_MASK > 0
+ self.extern = self.flags & self.EXTERN_MASK > 0
self.xid = None
self.data = None
self.enq_tail = None
@@ -401,228 +438,77 @@
self.tail_complete = False
self.tail_bin = None
self.tail_offs = 0
- self.load(f)
+ self.load(fhandle)
- def load(self, f):
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
if not self.xid_complete:
- ret = Utils.loadFileData(f, self.xidsize, self.xid)
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
self.xid = ret[0]
self.xid_complete = ret[1]
if self.xid_complete and not self.data_complete:
if self.extern:
self.data_complete = True
else:
- ret = Utils.loadFileData(f, self.dsize, self.data)
+ ret = Utils.load_file_data(fhandle, self.dsize, self.data)
self.data = ret[0]
self.data_complete = ret[1]
if self.data_complete and not self.tail_complete:
- ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
self.tail_bin = ret[0]
if ret[1]:
- self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
- if self.enq_tail.magic_inv != Utils.invStr(self.magic) or self.enq_tail.rid != self.rid:
- raise Exception(" > %s *INVALID TAIL RECORD*" % self)
- self.enq_tail.skip(f)
+ self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.enq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
self.tail_complete = ret[1]
return self.complete()
def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
return self.xid_complete and self.data_complete and self.tail_complete
def print_flags(self):
- s = ""
+ """Utility function to decode the flags field in the header and print a string representation"""
+ fstr = ""
if self.transient:
- s = "*TRANSIENT"
+ fstr = "*TRANSIENT"
if self.extern:
- if len(s) > 0:
- s += ",EXTERNAL"
+ if len(fstr) > 0:
+ fstr += ",EXTERNAL"
else:
- s = "*EXTERNAL"
- if len(s) > 0:
- s += "*"
- return s
+ fstr = "*EXTERNAL"
+ if len(fstr) > 0:
+ fstr += "*"
+ return fstr
#== class RecTail =============================================================
-class RecTail(Sizeable):
+class RecTail:
+ """Class for a record tail - for all records where either an XID or data separate the header from the end of the
+ record"""
- format = "=4sQ"
+ FORMAT = "=4sQ"
def __init__(self, foffs, magic_inv, rid):
+ """Initialize this instance to known values"""
self.foffs = foffs
self.magic_inv = magic_inv
self.rid = long(rid)
def __str__(self):
- magic = Utils.invStr(self.magic_inv)
+ """Return a string representation of the this RecTail instance"""
+ magic = Utils.inv_str(self.magic_inv)
return "[\"%s\" rid=0x%x]" % (magic, self.rid)
def encode(self):
- return pack(RecTail.format, self.magic_inv, self.rid)
+ """Encode this class into a binary string"""
+ return pack(RecTail.FORMAT, self.magic_inv, self.rid)
- def skip(self, f):
- f.read(Utils.remBytesInBlk(f, dblkSize))
-
-#== class EnqMap ==============================================================
-
-class EnqMap(object):
-
- def __init__(self):
- self.__map = {}
-
- def __str__(self):
- return self.report(True, True)
-
- def add(self, fid, hdr, lock = False):
- if hdr.rid in self.__map.keys(): raise Exception("ERROR: Duplicate rid to EnqMap: rid=0x%x" % hdr.rid)
- self.__map[hdr.rid] = (fid, hdr, lock)
-
- def contains(self, rid):
- return rid in self.__map.keys()
-
- def delete(self, rid):
- if rid in self.__map.keys():
- if self.getLock(rid):
- raise Exception("ERROR: Deleting locked record from EnqMap: rid=0x%s" % rid)
- del self.__map[rid]
- else:
- raise Warning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
-
- def get(self, rid):
- if self.contains(rid): return self.__map[rid]
- return None
-
- def getFid(self, rid):
- if self.contains(rid): return self.__map[rid][0]
- return None
-
- def getHdr(self, rid):
- if self.contains(rid): return self.__map[rid][1]
- return None
-
- def getLock(self, rid):
- if self.contains(rid): return self.__map[rid][2]
- return None
-
- def getRecList(self):
- return self.__map.values()
-
- def lock(self, rid):
- if rid in self.__map.keys():
- tup = self.__map[rid]
- self.__map[rid] = (tup[0], tup[1], True)
- else:
- raise Warning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
-
- def report(self, showStats, showRecords):
- if len(self.__map) == 0: return "No enqueued records found."
- s = "%d enqueued records found" % len(self.__map)
- if showRecords:
- s += ":"
- ridList = self.__map.keys()
- ridList.sort()
- for rid in ridList:
-# for f,r in self.__map.iteritems():
- r = self.__map[rid]
- if r[2]:
- lockStr = " [LOCKED]"
- else:
- lockStr = ""
- s += "\n lfid=%d %s %s" % (r[0], r[1], lockStr)
- else:
- s += "."
- return s
-
- def rids(self):
- return self.__map.keys()
-
- def size(self):
- return len(self.__map)
-
- def unlock(self, rid):
- if rid in self.__map.keys():
- tup = self.__map[rid]
- if tup[2]:
- self.__map[rid] = (tup[0], tup[1], False)
- else:
- raise Exception("ERROR: Unlocking rid which is not locked in EnqMap: rid=0x%x" % rid)
- else:
- raise Exception("ERROR: Unlocking non-existent rid in EnqMap: rid=0x%x" % rid)
-
-
-#== class TxnMap ==============================================================
-
-class TxnMap(object):
-
- def __init__(self, emap):
- self.__emap = emap
- self.__map = {}
-
- def __str__(self):
- return self.report(True, True)
-
- def add(self, fid, hdr):
- if isinstance(hdr, DeqRec):
- self.__emap.lock(hdr.deq_rid)
- if hdr.xid in self.__map.keys():
- self.__map[hdr.xid].append((fid, hdr)) # append to existing list
- else:
- self.__map[hdr.xid] = [(fid, hdr)] # create new list
-
- def contains(self, xid):
- return xid in self.__map.keys()
-
- def delete(self, hdr):
- if hdr.magic[-1] == "c": return self.__commit(hdr.xid)
- if hdr.magic[-1] == "a": self.__abort(hdr.xid)
- else: raise Exception("ERROR: cannot delete from TxnMap using hdr type %s" % hdr.magic)
-
- def get(self, xid):
- if self.contains(xid): return self.__map[xid]
-
- def report(self, showStats, showRecords):
- if len(self.__map) == 0: return "No outstanding transactions found."
- s = "%d outstanding transactions found" % len(self.__map)
- if showRecords:
- s += ":"
- for x,t in self.__map.iteritems():
- s += "\n xid=%s:" % Utils.formatXid(x)
- for i in t:
- s += "\n %s" % str(i[1])
- else:
- s += "."
- return s
-
- def size(self):
- return len(self.__map)
-
- def xids(self):
- return self.__map.keys()
-
- def __abort(self, xid):
- for fid, hdr in self.__map[xid]:
- if isinstance(hdr, DeqRec):
- self.__emap.unlock(hdr.rid)
- del self.__map[xid]
-
- def __commit(self, xid):
- mismatchList = []
- for fid, hdr in self.__map[xid]:
- if isinstance(hdr, EnqRec):
- self.__emap.add(fid, hdr) # Transfer enq to emap
- else:
- if self.__emap.contains(hdr.deq_rid):
- self.__emap.unlock(hdr.deq_rid)
- self.__emap.delete(hdr.deq_rid)
- else:
- mismatchList.append("0x%x" % hdr.deq_rid)
- del self.__map[xid]
- return mismatchList
-
-
-
#== class JrnlInfo ============================================================
class JrnlInfo(object):
@@ -639,17 +525,17 @@
.jinf file must reflect these changes, as this file is the source of information for journal
recovery.
- NOTE: Size vs File size: There are methods which return the journal size and file size of the
+ NOTE: Data size vs File size: There are methods which return the data size and file size of the
journal files.
+-------------+--------------------/ /----------+
| File header | File data |
+-------------+--------------------/ /----------+
| | |
- | |<------------- Size ------------>|
- |<------------------ FileSize ----------------->|
+ | |<---------- Data size ---------->|
+ |<------------------ File Size ---------------->|
- Size: The size of the data content of the journal, ie that part which stores the data records.
+ Data size: The size of the data content of the journal, ie that part which stores the data records.
File size: The actual disk size of the journal including data and the file header which precedes the
data.
@@ -658,176 +544,221 @@
"""
def __init__(self, jdir, bfn = "JournalData"):
+ """Constructor"""
self.__jdir = jdir
self.__bfn = bfn
- self.__jinfDict = {}
- self.__read_jinf()
+ self.__jinf_dict = {}
+ self._read_jinf()
def __str__(self):
- s = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
- for k,v in self.__jinfDict.iteritems():
- s += " %s = %s\n" % (k, v)
- return s
+ """Create a string containing all of the journal info contained in the jinf file"""
+ ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
+ for key, val in self.__jinf_dict.iteritems():
+ ostr += " %s = %s\n" % (key, val)
+ return ostr
- def normalize(self, jdir = None, baseFilename = None):
+ def normalize(self, jdir = None, bfn = None):
+ """Normalize the directory (ie reset the directory path to match the actual current location) for this
+ jinf file"""
if jdir == None:
- self.__jinfDict["directory"] = self.__jdir
+ self.__jinf_dict["directory"] = self.__jdir
else:
self.__jdir = jdir
- self.__jinfDict["directory"] = jdir
- if baseFilename != None:
- self.__bfn = baseFilename
- self.__jinfDict["base_filename"] = baseFilename
+ self.__jinf_dict["directory"] = jdir
+ if bfn != None:
+ self.__bfn = bfn
+ self.__jinf_dict["base_filename"] = bfn
- def resize(self, njFiles = None, jfSize = None):
- if njFiles != None:
- self.__jinfDict["number_jrnl_files"] = njFiles
- if jfSize != None:
- self.__jinfDict["jrnl_file_size_sblks"] = jfSize * dblkSize
+ def resize(self, num_jrnl_files = None, jrnl_file_size = None):
+ """Reset the journal size information to allow for resizing the journal"""
+ if num_jrnl_files != None:
+ self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
+ if jrnl_file_size != None:
+ self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes()
- def write(self, jdir = None, baseFilename = None):
- self.normalize(jdir, baseFilename)
- if not os.path.exists(self.getJrnlDir()): os.makedirs(self.getJrnlDir())
- wdir = os.path.join(self.getJrnlDir(), "%s.jinf" % self.getJrnlBaseName())
- f = open(os.path.join(self.getJrnlDir(), "%s.jinf" % self.getJrnlBaseName()), "w")
- f.write("<?xml version=\"1.0\" ?>\n")
- f.write("<jrnl>\n")
- f.write(" <journal_version value=\"%d\" />\n" % self.getJrnlVersion())
- f.write(" <journal_id>\n")
- f.write(" <id_string value=\"%s\" />\n" % self.getJrnlId())
- f.write(" <directory value=\"%s\" />\n" % self.getJrnlDir())
- f.write(" <base_filename value=\"%s\" />\n" % self.getJrnlBaseName())
- f.write(" </journal_id>\n")
- f.write(" <creation_time>\n")
- f.write(" <seconds value=\"%d\" />\n" % self.getCreationTime()[0])
- f.write(" <nanoseconds value=\"%d\" />\n" % self.getCreationTime()[1])
- f.write(" <string value=\"%s\" />\n" % self.getCreationTimeStr())
- f.write(" </creation_time>\n")
- f.write(" <journal_file_geometry>\n")
- f.write(" <number_jrnl_files value=\"%d\" />\n" % self.getNumJrnlFiles())
- f.write(" <auto_expand value=\"%s\" />\n" % str.lower(str(self.getAutoExpand())))
- f.write(" <jrnl_file_size_sblks value=\"%d\" />\n" % self.getJrnlSizeSblks())
- f.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.getJrnlSblkSize())
- f.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.getJrnlDblkSize())
- f.write(" </journal_file_geometry>\n")
- f.write(" <cache_geometry>\n")
- f.write(" <wcache_pgsize_sblks value=\"%d\" />\n" % self.getWriteBufferPageSizeSblks())
- f.write(" <wcache_num_pages value=\"%d\" />\n" % self.getNumWriteBufferPages())
- f.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.getReadBufferPageSizeSblks())
- f.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.getNumReadBufferPages())
- f.write(" </cache_geometry>\n")
- f.write("</jrnl>\n")
- f.close()
+ def write(self, jdir = None, bfn = None):
+ """Write the .jinf file"""
+ self.normalize(jdir, bfn)
+ if not os.path.exists(self.get_jrnl_dir()):
+ os.makedirs(self.get_jrnl_dir())
+ fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w")
+ fhandle.write("<?xml version=\"1.0\" ?>\n")
+ fhandle.write("<jrnl>\n")
+ fhandle.write(" <journal_version value=\"%d\" />\n" % self.get_jrnl_version())
+ fhandle.write(" <journal_id>\n")
+ fhandle.write(" <id_string value=\"%s\" />\n" % self.get_jrnl_id())
+ fhandle.write(" <directory value=\"%s\" />\n" % self.get_jrnl_dir())
+ fhandle.write(" <base_filename value=\"%s\" />\n" % self.get_jrnl_base_name())
+ fhandle.write(" </journal_id>\n")
+ fhandle.write(" <creation_time>\n")
+ fhandle.write(" <seconds value=\"%d\" />\n" % self.get_creation_time()[0])
+ fhandle.write(" <nanoseconds value=\"%d\" />\n" % self.get_creation_time()[1])
+ fhandle.write(" <string value=\"%s\" />\n" % self.get_creation_time_str())
+ fhandle.write(" </creation_time>\n")
+ fhandle.write(" <journal_file_geometry>\n")
+ fhandle.write(" <number_jrnl_files value=\"%d\" />\n" % self.get_num_jrnl_files())
+ fhandle.write(" <auto_expand value=\"%s\" />\n" % str.lower(str(self.get_auto_expand())))
+ fhandle.write(" <jrnl_file_size_sblks value=\"%d\" />\n" % self.get_jrnl_data_size_sblks())
+ fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_sblk_size_dblks())
+ fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_dblk_size_bytes())
+ fhandle.write(" </journal_file_geometry>\n")
+ fhandle.write(" <cache_geometry>\n")
+ fhandle.write(" <wcache_pgsize_sblks value=\"%d\" />\n" % self.get_wr_buf_pg_size_sblks())
+ fhandle.write(" <wcache_num_pages value=\"%d\" />\n" % self.get_num_wr_buf_pgs())
+ fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.get_rd_buf_pg_size_sblks())
+ fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.get_num_rd_buf_pgs())
+ fhandle.write(" </cache_geometry>\n")
+ fhandle.write("</jrnl>\n")
+ fhandle.close()
# Journal ID
- def getJrnlVersion(self):
- return self.__jinfDict["journal_version"]
+ def get_jrnl_version(self):
+ """Get the journal version"""
+ return self.__jinf_dict["journal_version"]
- def getJrnlId(self):
- return self.__jinfDict["id_string"]
+ def get_jrnl_id(self):
+ """Get the journal id"""
+ return self.__jinf_dict["id_string"]
- def getCurrentJnrlDir(self):
+ def get_current_dir(self):
+ """Get the current directory of the store (as opposed to that value saved in the .jinf file)"""
return self.__jdir
- def getJrnlDir(self):
- return self.__jinfDict["directory"]
+ def get_jrnl_dir(self):
+ """Get the journal directory stored in the .jinf file"""
+ return self.__jinf_dict["directory"]
- def getJrnlBaseName(self):
- return self.__jinfDict["base_filename"]
+ def get_jrnl_base_name(self):
+ """Get the base filename - that string used to name the journal files <basefilename>-nnnn.jdat and
+ <basefilename>.jinf"""
+ return self.__jinf_dict["base_filename"]
# Journal creation time
- def getCreationTime(self):
- return (self.__jinfDict["seconds"], self.__jinfDict["nanoseconds"])
+ def get_creation_time(self):
+ """Get journal creation time as a tuple (secs, nsecs)"""
+ return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"])
- def getCreationTimeStr(self):
- return self.__jinfDict["string"]
+ def get_creation_time_str(self):
+ """Get journal creation time as a string"""
+ return self.__jinf_dict["string"]
- # Files and geometry
+ # --- Files and geometry ---
- def getNumJrnlFiles(self):
- return self.__jinfDict["number_jrnl_files"]
+ def get_num_jrnl_files(self):
+ """Get number of data files in the journal"""
+ return self.__jinf_dict["number_jrnl_files"]
- def getAutoExpand(self):
- return self.__jinfDict["auto_expand"]
+ def get_auto_expand(self):
+ """Return True if auto-expand is enabled; False otherwise"""
+ return self.__jinf_dict["auto_expand"]
- def getJrnlSblkSize(self):
- return self.__jinfDict["JRNL_SBLK_SIZE"]
+ def get_jrnl_sblk_size_dblks(self):
+ """Get the journal softblock size in dblks"""
+ return self.__jinf_dict["JRNL_SBLK_SIZE"]
+
+ def get_jrnl_sblk_size_bytes(self):
+ """Get the journal softblock size in bytes"""
+ return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_jrnl_dblk_size_bytes(self):
+ """Get the journal datablock size in bytes"""
+ return self.__jinf_dict["JRNL_DBLK_SIZE"]
- def getJrnlDblkSize(self):
- return self.__jinfDict["JRNL_DBLK_SIZE"]
+ def get_jrnl_data_size_sblks(self):
+ """Get the data capacity (excluding the file headers) for one journal file in softblocks"""
+ return self.__jinf_dict["jrnl_file_size_sblks"]
- def getJrnlSizeSblks(self):
- return self.__jinfDict["jrnl_file_size_sblks"]
+ def get_jrnl_data_size_dblks(self):
+ """Get the data capacity (excluding the file headers) for one journal file in datablocks"""
+ return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
- def getJrnlSizeDblks(self):
- return self.getJrnlSizeSblks() * self.getJrnlSblkSize()
+ def get_jrnl_data_size_bytes(self):
+ """Get the data capacity (excluding the file headers) for one journal file in bytes"""
+ return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
- def getJrnlSizeBytes(self):
- return self.getJrnlSizeDblks() * self.getJrnlDblkSize()
+ def get_jrnl_file_size_sblks(self):
+ """Get the size of one journal file on disk (including the file headers) in softblocks"""
+ return self.get_jrnl_data_size_sblks() + 1
- def getJrnlFileSizeSblks(self):
- return self.getJrnlSizeSblks() + 1
+ def get_jrnl_file_size_dblks(self):
+ """Get the size of one journal file on disk (including the file headers) in datablocks"""
+ return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
- def getJrnlFileSizeDblks(self):
- return self.getJrnlFileSizeSblks() * self.getJrnlSblkSize()
+ def get_jrnl_file_size_bytes(self):
+ """Get the size of one journal file on disk (including the file headers) in bytes"""
+ return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
- def getJrnlFileSizeBytes(self):
- return self.getJrnlFileSizeDblks() * self.getJrnlDblkSize()
+ def get_tot_jrnl_data_size_sblks(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ softblocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
- def getTotalJrnlFileCapacitySblks(self):
- return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
+ def get_tot_jrnl_data_size_dblks(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ datablocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
- def getTotalJrnlFileCapacityDblks(self):
- return self.getNumJrnlFiles() * self.getJrnlSizeDblks()
+ def get_tot_jrnl_data_size_bytes(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ bytes"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
- def getTotalJrnlFileCapacityBytes(self):
- return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
-
# Read and write buffers
- def getWriteBufferPageSizeSblks(self):
- return self.__jinfDict["wcache_pgsize_sblks"]
+ def get_wr_buf_pg_size_sblks(self):
+ """Get the size of the write buffer pages in softblocks"""
+ return self.__jinf_dict["wcache_pgsize_sblks"]
- def getWriteBufferPageSizeDblks(self):
- return self.getWriteBufferPageSizeSblks() * self.getJrnlSblkSize()
+ def get_wr_buf_pg_size_dblks(self):
+ """Get the size of the write buffer pages in datablocks"""
+ return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
- def getWriteBufferPageSizeBytes(self):
- return self.getWriteBufferPageSizeDblks() * self.getJrnlDblkSize()
+ def get_wr_buf_pg_size_bytes(self):
+ """Get the size of the write buffer pages in bytes"""
+ return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
- def getNumWriteBufferPages(self):
- return self.__jinfDict["wcache_num_pages"]
+ def get_num_wr_buf_pgs(self):
+ """Get the number of write buffer pages"""
+ return self.__jinf_dict["wcache_num_pages"]
- def getReadBufferPageSizeSblks(self):
- return self.__jinfDict["JRNL_RMGR_PAGE_SIZE"]
+ def get_rd_buf_pg_size_sblks(self):
+ """Get the size of the read buffer pages in softblocks"""
+ return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
- def getReadBufferPageSizeDblks(self):
- return self.getReadBufferPageSizeSblks * self.getJrnlSblkSize()
+ def get_rd_buf_pg_size_dblks(self):
+ """Get the size of the read buffer pages in datablocks"""
+ return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
- def getReadBufferPageSizeBytes(self):
- return self.getReadBufferPageSizeDblks * self.getJrnlDblkSize()
+ def get_rd_buf_pg_size_bytes(self):
+ """Get the size of the read buffer pages in bytes"""
+ return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
- def getNumReadBufferPages(self):
- return self.__jinfDict["JRNL_RMGR_PAGES"]
+ def get_num_rd_buf_pgs(self):
+ """Get the number of read buffer pages"""
+ return self.__jinf_dict["JRNL_RMGR_PAGES"]
- def __read_jinf(self):
- f = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
- p = xml.parsers.expat.ParserCreate()
- p.StartElementHandler = self.__handleXmlStartElement
- p.CharacterDataHandler = self.__handleXmlCharData
- p.EndElementHandler = self.__handleXmlEndElement
- p.ParseFile(f)
- f.close()
+ def _read_jinf(self):
+ """Read and initialize this instance from an existing jinf file located at the directory named in the
+ constructor - called by the constructor"""
+ fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
+ parser = xml.parsers.expat.ParserCreate()
+ parser.StartElementHandler = self._handle_xml_start_elt
+ parser.CharacterDataHandler = self._handle_xml_char_data
+ parser.EndElementHandler = self._handle_xml_end_elt
+ parser.ParseFile(fhandle)
+ fhandle.close()
- def __handleXmlStartElement(self, name, attrs):
+ def _handle_xml_start_elt(self, name, attrs):
+ """Callback for handling XML start elements. Used by the XML parser."""
# bool values
if name == "auto_expand":
- self.__jinfDict[name] = attrs["value"] == "true"
+ self.__jinf_dict[name] = attrs["value"] == "true"
# long values
elif name == "seconds" or \
name == "nanoseconds":
- self.__jinfDict[name] = long(attrs["value"])
+ self.__jinf_dict[name] = long(attrs["value"])
# int values
elif name == "journal_version" or \
name == "number_jrnl_files" or \
@@ -838,310 +769,21 @@
name == "wcache_num_pages" or \
name == "JRNL_RMGR_PAGE_SIZE" or \
name == "JRNL_RMGR_PAGES":
- self.__jinfDict[name] = int(attrs["value"])
+ self.__jinf_dict[name] = int(attrs["value"])
# strings
elif "value" in attrs:
- self.__jinfDict[name] = attrs["value"]
+ self.__jinf_dict[name] = attrs["value"]
- def __handleXmlCharData(self, data): pass
+ def _handle_xml_char_data(self, data):
+ """Callback for handling character data (ie within <elt>...</elt>). The jinf file does not use this in its
+ data. Used by the XML parser."""
+ pass
- def __handleXmlEndElement(self, name): pass
-
-#== class JrnlAnalyzer ========================================================
-
-class JrnlAnalyzer(object):
- """
- This class analyzes a set of journal files and determines which is the last to be written
- (the newest file), and hence which should be the first to be read for recovery (the oldest
- file).
-
- The analysis is performed on construction; the contents of the JrnlInfo object passed provide
- the recovery details.
- """
-
- def __init__(self, jinf):
- self.__oldest = None
- self.__jinf = jinf
- self.__flist = self._analyze()
-
- def __str__(self):
- s = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.getCurrentJnrlDir()
- if self.isEmpty():
- s += " <All journal files are empty>\n"
- else:
- for tup in self.__flist:
- o = " "
- if tup[0] == self.__oldest[0]: o = "*"
- s += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (o, os.path.basename(tup[1]), tup[2], tup[3], tup[4], tup[5])
- for i in range(self.__flist[-1][0] + 1, self.__jinf.getNumJrnlFiles()):
- s += " %s.%04d.jdat: <empty>\n" % (self.__jinf.getJrnlBaseName(), i)
- return s
+ def _handle_xml_end_elt(self, name):
+ """Callback for handling XML end elements. Used by XML parser."""
+ pass
- # Analysis
-
- def getOldestFile(self):
- return self.__oldest
- def getOldestFileIndex(self):
- if self.isEmpty(): return None
- return self.__oldest[0]
-
- def isEmpty(self):
- return len(self.__flist) == 0
-
- def _analyze(self):
- fname = ""
- fnum = -1
- rid = -1
- fro = -1
- tss = ""
- owi_found = False
- flist = []
- for i in range(0, self.__jinf.getNumJrnlFiles()):
- jfn = os.path.join(self.__jinf.getCurrentJnrlDir(), "%s.%04x.jdat" % (self.__jinf.getJrnlBaseName(), i))
- f = open(jfn)
- fhdr = Utils.load(f, Hdr)
- if fhdr.empty(): break
- this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
- flist.append(this_tup)
- if i == 0:
- init_owi = fhdr.owi()
- self.__oldest = this_tup
- elif fhdr.owi() != init_owi and not owi_found:
- self.__oldest = this_tup
- owi_found = True
- return flist
-
-
-#== class JrnlReader ====================================================
-
-class JrnlReader(object):
- """
- This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
- object list (txnObjList) which are populated by reading the journals from the oldest
- to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
- objects supplied on construction provide the information used for the recovery.
-
- The analysis is performed on construction.
- """
-
- def __init__(self, ji, jra, qflag = False, rflag = False, vflag = False):
- self._ji = ji
- self._jra = jra
- self._qflag = qflag
- self._rflag = rflag
- self._vflag = vflag
-
- # test callback functions for CSV tests
- self._csvStoreChk = None
- self._csvStartCb = None
- self._csvEnqCb = None
- self._csvDeqCb = None
- self._csvTxnCb = None
- self._csvEndCb = None
-
- self._emap = EnqMap()
- self._tmap = TxnMap(self._emap)
- self._txnObjList = {}
-
- self._file = None
- self._fileHdr = None
- self._fileNum = None
- self._firstRecFlag = None
- self._fro = None
- self._lastFileFlag = None
- self._startFileNum = None
- self._warning = []
-
- self._abortCnt = 0
- self._commitCnt = 0
- self._msgCnt = 0
- self._recCnt = 0
- self._txnMsgCnt = 0
-
- def __str__(self):
- return self.report(True, self._rflag)
-
- def abortCnt(self): return self._abortCnt
-
- def commitCnt(self): return self._commitCnt
-
- def emap(self): return self._emap
-
- def msgCnt(self): return self._msgCnt
-
- def recCnt(self): return self._recCnt
-
- def report(self, showStats = True, showRecords = False):
- s = self._emap.report(showStats, showRecords) + "\n" + self._tmap.report(showStats, showRecords)
- #TODO - print size analysis here
- return s
-
- def run(self):
- if self._csvStartCb != None and self._csvStartCb(self._csvStoreChk): return
- if self._jra.isEmpty(): return
- stop = self._advanceJrnlFile(*self._jra.getOldestFile())
- while not stop and not self._getNextRecord(): pass
- if self._csvEndCb != None and self._csvEndCb(self._csvStoreChk): return
- if not self._qflag: print
-
- def setCallbacks(self, csvStoreChk, csvStartCb = None, csvEnqCb = None, csvDeqCb = None, csvTxnCb = None, csvEndCb = None):
- self._csvStoreChk = csvStoreChk
- self._csvStartCb = csvStartCb
- self._csvEnqCb = csvEnqCb
- self._csvDeqCb = csvDeqCb
- self._csvTxnCb = csvTxnCb
- self._csvEndCb = csvEndCb
-
- def tmap(self): return self._tmap
-
- def txnMsgCnt(self): return self._txnMsgCnt
-
- def txnObjList(self): return self._txnObjList
-
- def _advanceJrnlFile(self, *oldestFileInfo):
- froSeekFlag = False
- if len(oldestFileInfo) > 0:
- self._startFileNum = self._fileNum = oldestFileInfo[0]
- self._fro = oldestFileInfo[4]
- froSeekFlag = True # jump to fro to start reading
- if not self._qflag and not self._rflag:
- if self._vflag: print "Recovering journals..."
- else: print "Recovering journals",
- if self._file != None and self._fileFull():
- self._file.close()
- self._fileNum = self._incrFileNum()
- if self._fileNum == self._startFileNum:
- return True
- if self._startFileNum == 0:
- self._lastFileFlag = self._fileNum == self._ji.getNumJrnlFiles() - 1
- else:
- self._lastFileFlag = self._fileNum == self._startFileNum - 1
- if self._fileNum < 0 or self._fileNum >= self._ji.getNumJrnlFiles():
- raise Exception("Bad file number %d" % self._fileNum)
- jfn = os.path.join(self._ji.getCurrentJnrlDir(), "%s.%04x.jdat" % (self._ji.getJrnlBaseName(), self._fileNum))
- self._file = open(jfn)
- self._fileHdr = Utils.load(self._file, Hdr)
- if froSeekFlag and self._file.tell() != self._fro:
- self._file.seek(self._fro)
- self._firstRecFlag = True
- if not self._qflag:
- if self._rflag: print jfn, ": ", self._fileHdr
- elif self._vflag: print "* Reading %s" % jfn
- else:
- print ".",
- sys.stdout.flush()
- return False
-
- def _checkOwi(self, hdr):
- return self._fileHdrOwi == hdr.owi()
-
- def _fileFull(self):
- return self._file.tell() >= self._ji.getJrnlFileSizeBytes()
-
- def _getNextRecord(self, *oldestFileInfo):
- if self._fileFull():
- if self._advanceJrnlFile(): return True
- try: hdr = Utils.load(self._file, Hdr)
- except: return True
- if hdr.empty(): return True
- if hdr.check(): return True
- self._recCnt += 1
- self._fileHdrOwi = self._fileHdr.owi()
- if self._firstRecFlag:
- if self._fileHdr.fro != hdr.foffs:
- raise Exception("File header first record offset mismatch: fro=0x%x; rec_offs=0x%x" % (self._fileHdr.fro, hdr.foffs))
- else:
- if self._rflag: print " * fro ok: 0x%x" % self._fileHdr.fro
- self._firstRecFlag = False
- stop = False
- if isinstance(hdr, EnqRec):
- stop = self._handleEnqRec(hdr)
- elif isinstance(hdr, DeqRec):
- stop = self._handleDeqRec(hdr)
- elif isinstance(hdr, TxnRec):
- stop = self._handleTxnRec(hdr)
- wstr = ""
- for w in self._warning:
- wstr += " (%s)" % w
- if self._rflag: print " > %s %s" % (hdr, wstr)
- self._warning = []
- return stop
-
- def _handleDeqRec(self, hdr):
- if self._loadRec(hdr): return True
-
- # Check OWI flag
- if not self._checkOwi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csvDeqCb != None and self._csvDeqCb(self._csvStoreChk, hdr): return True
-
- try:
- if hdr.xid == None:
- self._emap.delete(hdr.deq_rid)
- else:
- self._tmap.add(self._fileHdr.fid, hdr)
- except Warning, w: self._warning.append(str(w))
- return False
-
- def _handleEnqRec(self, hdr):
- if self._loadRec(hdr): return True
-
- # Check extern flag
- if hdr.extern and hdr.data != None: raise Exception("Message data found on external record: hdr=%s" % hdr)
- # Check OWI flag
- if not self._checkOwi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csvEnqCb != None and self._csvEnqCb(self._csvStoreChk, hdr): return True
-
- if hdr.xid == None:
- self._emap.add(self._fileHdr.fid, hdr)
- else:
- self._txnMsgCnt += 1
- self._tmap.add(self._fileHdr.fid, hdr)
- self._msgCnt += 1
- return False
-
- def _handleTxnRec(self, hdr):
- if self._loadRec(hdr): return True
-
- # Check OWI flag
- if not self._checkOwi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csvTxnCb != None and self._csvTxnCb(self._csvStoreChk, hdr): return True
-
- if hdr.magic[-1] == "a": self._abortCnt += 1
- else: self._commitCnt += 1
-
- if self._tmap.contains(hdr.xid):
- mismatchedRids = self._tmap.delete(hdr)
- if mismatchedRids != None and len(mismatchedRids) > 0:
- self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % mismatchedRids)
- else:
- self._warning.append("WARNING: %s not found in transaction map" % Utils.formatXid(hdr.xid))
- if hdr.magic[-1] == "c": # commits only
- self._txnObjList[hdr.xid] = hdr
- return False
-
- def _incrFileNum(self):
- self._fileNum += 1
- if self._fileNum >= self._ji.getNumJrnlFiles():
- self._fileNum = 0;
- return self._fileNum
-
- def _loadRec(self, hdr):
- while not hdr.complete():
- if self._advanceJrnlFile(): return True
- hdr.load(self._file)
- return False
-
-
#==============================================================================
_CLASSES = {
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2010-04-26 18:31:43 UTC (rev 3930)
+++ store/trunk/cpp/tools/resize 2010-04-26 19:57:05 UTC (rev 3931)
@@ -1,29 +1,33 @@
#!/usr/bin/env python
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
-# Copyright (c) 2007, 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
+This file is part of the Qpid async store library msgstore.so.
-import jrnl
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr, jrnl, janal
import glob, optparse, os, sys, time
+
+#== class Resize ==============================================================
+
class Resize(object):
"""
Creates a new store journal and copies records from old journal to new. The new journal may be of
@@ -51,71 +55,83 @@
NUM_JFILES_MIN = 4
NUM_JFILES_MAX = 64
- def __init__(self, args):
+ def __init__(self):
+ """Constructor"""
self._opts = None
self._jdir = None
- self._fName = None
- self._fNum = None
+ self._fname = None
+ self._fnum = None
self._file = None
- self._fileRecWrCnt = None
- self._fillerWrCnt = None
- self._lastRecFid = None
- self._lastRecOffs = None
- self._recWrCnt = None
+ self._file_rec_wr_cnt = None
+ self._filler_wr_cnt = None
+ self._last_rec_fid = None
+ self._last_rec_offs = None
+ self._rec_wr_cnt = None
- self._jrnlInfo = None
- self._jrnlAnal = None
- self._jrnlRdr = None
+ self._jrnl_info = None
+ self._jrnl_analysis = None
+ self._jrnl_reader = None
- self._processArgs(args)
- self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
# FIXME: This is a hack... find an elegant way of getting the file size to jrec!
- jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
- self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
- self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag, self._opts.rflag, self._opts.vflag)
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info)
+ self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag,
+ self._opts.vflag)
def run(self):
- if not self._opts.qflag: print self._jrnlAnal
- self._jrnlRdr.run()
- if self._opts.vflag: print self._jrnlInfo
- if not self._opts.qflag: print self._jrnlRdr.report(self._opts.vflag, self._opts.rflag)
- self._handleOldFiles()
- self._createNewFiles()
- if not self._opts.qflag: print "Transferred %d records to new journal." % self._recWrCnt
- self._chkFree()
+ """Perform the action of resizing the journal"""
+ if not self._opts.qflag:
+ print self._jrnl_analysis
+ self._jrnl_reader.run()
+ if self._opts.vflag:
+ print self._jrnl_info
+ if not self._opts.qflag:
+ print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag)
+ self._handle_old_files()
+ self._create_new_files()
+ if not self._opts.qflag:
+ print "Transferred %d records to new journal." % self._rec_wr_cnt
+ self._chk_free()
- def _chkFree(self):
- if self._lastRecFid == None or self._lastRecOffs == None: return
- wrCapacityBytes = self._lastRecFid * self._jrnlInfo.getJrnlSizeBytes() + self._lastRecOffs
- totCapacityBytes = self._jrnlInfo.getTotalJrnlFileCapacityBytes()
- percentFull = 100.0 * wrCapacityBytes / totCapacityBytes
- if percentFull > 80.0:
- raise jrnl.Warning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records until some existing records are dequeued." % (self._jrnlInfo.getJrnlId(), percentFull))
+ def _chk_free(self):
+ """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not."""
+ if self._last_rec_fid == None or self._last_rec_offs == None:
+ return
+ wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs
+ tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes()
+ percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes
+ if percent_full > 80.0:
+ raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records"
+ " until some existing records are dequeued." %
+ (self._jrnl_info.get_jrnl_id(), percent_full))
- def _createNewFiles(self):
+ def _create_new_files(self):
+ """Create new journal files"""
# Assemble records to be transfered
- masterRecordList = {}
- txnRecordList = self._jrnlRdr.txnObjList()
- if self._opts.vflag and self._jrnlRdr.emap().size() > 0:
- print "* Assembling %d records from emap" % self._jrnlRdr.emap().size()
- for t in self._jrnlRdr.emap().getRecList():
- hdr = t[1]
- hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
- masterRecordList[long(hdr.rid)] = hdr
- if hdr.xidsize > 0 and hdr.xid in txnRecordList.keys():
- txnHdr = txnRecordList[hdr.xid]
- del(txnRecordList[hdr.xid])
- txnHdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
- masterRecordList[long(txnHdr.rid)] = txnHdr
- if self._opts.vflag and self._jrnlRdr.tmap().size() > 0:
- print "* Assembling %d records from tmap" % self._jrnlRdr.tmap().size()
- for x in self._jrnlRdr.tmap().xids():
- for t in self._jrnlRdr.tmap().get(x):
- hdr = t[1]
- hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
- masterRecordList[hdr.rid] = hdr
- ridList = masterRecordList.keys()
- ridList.sort()
+ master_record_list = {}
+ txn_record_list = self._jrnl_reader.txn_obj_list()
+ if self._opts.vflag and self._jrnl_reader.emap().size() > 0:
+ print "* Assembling %d records from emap" % self._jrnl_reader.emap().size()
+ for tup in self._jrnl_reader.emap().get_rec_list():
+ hdr = tup[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(hdr.rid)] = hdr
+ if hdr.xidsize > 0 and hdr.xid in txn_record_list.keys():
+ txn_hdr = txn_record_list[hdr.xid]
+ del(txn_record_list[hdr.xid])
+ txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[long(txn_hdr.rid)] = txn_hdr
+ if self._opts.vflag and self._jrnl_reader.tmap().size() > 0:
+ print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size()
+ for xid in self._jrnl_reader.tmap().xids():
+ for tup in self._jrnl_reader.tmap().get(xid):
+ hdr = tup[1]
+ hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+ master_record_list[hdr.rid] = hdr
+ rid_list = master_record_list.keys()
+ rid_list.sort()
# get base filename
bfn = self._opts.bfn
@@ -123,118 +139,142 @@
bfn = self._opts.nbfn
# write jinf file
- self._jrnlInfo.resize(self._opts.njf, self._opts.jfs)
- self._jrnlInfo.write(self._jdir, bfn)
+ self._jrnl_info.resize(self._opts.njf, self._opts.jfs)
+ self._jrnl_info.write(self._jdir, bfn)
# write records
- if self._opts.vflag: print "* Transferring records to new journal files"
- fro = jrnl.sblkSize
- while len(ridList) > 0:
- hdr = masterRecordList[ridList.pop(0)]
+ if self._opts.vflag:
+ print "* Transferring records to new journal files"
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+ while len(rid_list) > 0:
+ hdr = master_record_list[rid_list.pop(0)]
rec = hdr.encode()
pos = 0
while pos < len(rec):
- if self._file == None or self._file.tell() >= self._jrnlInfo.getJrnlFileSizeBytes():
- if self._file == None: rid = hdr.rid
- elif len(ridList) == 0: rid = None
- else: rid = ridList[0]
- if not self._rotateFile(rid, fro):
- raise Exception("ERROR: Ran out of journal space while writing records.")
- if len(rec) - pos <= self._jrnlInfo.getJrnlFileSizeBytes() - self._file.tell():
+ if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes():
+ if self._file == None:
+ rid = hdr.rid
+ elif len(rid_list) == 0:
+ rid = None
+ else:
+ rid = rid_list[0]
+ if not self._rotate_file(rid, fro):
+ raise jerr.JournalSpaceExceededError()
+ if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell():
self._file.write(rec[pos:])
- self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(), jrnl.dblkSize))
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
pos = len(rec)
- fro = jrnl.sblkSize
+ fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
else:
- l = self._jrnlInfo.getJrnlFileSizeBytes() - self._file.tell()
- self._file.write(rec[pos:pos+l])
- pos += l
+ flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell()
+ self._file.write(rec[pos:pos + flen])
+ pos += flen
rem = len(rec) - pos
- if rem <= self._jrnlInfo.getJrnlSizeBytes():
- fro = (jrnl.Utils.sizeInBytesToBlk(jrnl.sblkSize + rem, jrnl.dblkSize))
+ if rem <= self._jrnl_info.get_jrnl_data_size_bytes():
+ fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem,
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
else:
fro = 0
- self._recWrCnt += 1
- self._fileRecWrCnt += 1
- self._fillFile(addFillerRecs = True)
- while self._rotateFile(): pass
+ self._rec_wr_cnt += 1
+ self._file_rec_wr_cnt += 1
+ self._fill_file(add_filler_recs = True)
+ while self._rotate_file():
+ pass
- def _fillFile(self, toPosn = None, addFillerRecs = False):
- if self._file == None: return
- if addFillerRecs:
- nfr = int(jrnl.Utils.remBytesInBlk(self._file, jrnl.sblkSize) / jrnl.dblkSize)
+ def _fill_file(self, to_posn = None, add_filler_recs = False):
+ """Fill a file to a known offset"""
+ if self._file == None:
+ return
+ if add_filler_recs:
+ nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) /
+ self._jrnl_info.get_jrnl_dblk_size_bytes())
if nfr > 0:
- self._fillerWrCnt = nfr
+ self._filler_wr_cnt = nfr
for i in range(0, nfr):
self._file.write("RHMx")
- self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(), jrnl.dblkSize))
- self._lastRecFid = self._fNum
- self._lastRecOffs = self._file.tell()
- if toPosn == None: toPosn = self._jrnlInfo.getJrnlFileSizeBytes()
- elif toPosn > self._jrnlInfo.getJrnlFileSizeBytes(): raise Exception("Filling to size > max file size")
- diff = toPosn - self._file.tell()
+ self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+ self._jrnl_info.get_jrnl_dblk_size_bytes()))
+ self._last_rec_fid = self._fnum
+ self._last_rec_offs = self._file.tell()
+ if to_posn == None:
+ to_posn = self._jrnl_info.get_jrnl_file_size_bytes()
+ elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes():
+ raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes())
+ diff = to_posn - self._file.tell()
self._file.write(str("\0" * diff))
#DEBUG
- if self._file.tell() != toPosn: raise Exception("OOPS - File size problem")
+ if self._file.tell() != to_posn:
+ raise jerr.FillSizeError(self._file.tell(), to_posn)
- def _rotateFile(self, rid = None, fro = None):
+ def _rotate_file(self, rid = None, fro = None):
+ """Switch to the next logical file"""
if self._file != None:
self._file.close()
if self._opts.vflag:
- if self._fileRecWrCnt == 0:
+ if self._file_rec_wr_cnt == 0:
print " (empty)"
- elif self._fillerWrCnt == None:
- print " (%d records)" % self._fileRecWrCnt
+ elif self._filler_wr_cnt == None:
+ print " (%d records)" % self._file_rec_wr_cnt
else:
- print " (%d records + %d filler(s))" % (self._fileRecWrCnt, self._fillerWrCnt)
- if self._fNum == None:
- self._fNum = 0
- self._recWrCnt = 0
- elif self._fNum == self._jrnlInfo.getNumJrnlFiles() - 1: return False
- else: self._fNum += 1
- self._fileRecWrCnt = 0
- self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04x.jdat" % (self._jrnlInfo.getJrnlBaseName(), self._fNum))
- if self._opts.vflag: print "* Opening file %s" % self._fName,
- self._file = open(self._fName, "w")
+ print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt)
+ if self._fnum == None:
+ self._fnum = 0
+ self._rec_wr_cnt = 0
+ elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1:
+ return False
+ else:
+ self._fnum += 1
+ self._file_rec_wr_cnt = 0
+ self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" %
+ (self._jrnl_info.get_jrnl_base_name(), self._fnum))
+ if self._opts.vflag:
+ print "* Opening file %s" % self._fname,
+ self._file = open(self._fname, "w")
if rid == None or fro == None:
- self._fillFile()
+ self._fill_file()
else:
- t = time.time()
- fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.hdrVer, int(jrnl.Hdr.big_endian_flag), 0, rid)
- fhdr.init(self._file, 0, self._fNum, self._fNum, fro, int(t), 1000000000*(t - int(t)))
+ now = time.time()
+ fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid)
+ fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now)))
self._file.write(fhdr.encode())
- self._fillFile(jrnl.sblkSize)
+ self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes())
return True
- def _handleOldFiles(self):
- targetDir = self._jdir
+ def _handle_old_files(self):
+ """Push old journal down into a backup directory"""
+ target_dir = self._jdir
if not self._opts.npd:
- targetDir = os.path.join(self._jdir, self.BAK_DIR)
- if os.path.exists(targetDir):
- if self._opts.vflag: print "* Pushdown directory %s exists, deleting content" % targetDir
- for f in glob.glob(os.path.join(targetDir, "*")):
- os.unlink(f)
+ target_dir = os.path.join(self._jdir, self.BAK_DIR)
+ if os.path.exists(target_dir):
+ if self._opts.vflag:
+ print "* Pushdown directory %s exists, deleting content" % target_dir
+ for fname in glob.glob(os.path.join(target_dir, "*")):
+ os.unlink(fname)
else:
- if self._opts.vflag: print "* Creating new pushdown directory %s" % targetDir
- os.mkdir(targetDir)
+ if self._opts.vflag:
+ print "* Creating new pushdown directory %s" % target_dir
+ os.mkdir(target_dir)
if not self._opts.npd or self._opts.obfn != None:
- if self._opts.obfn != None and self._opts.vflag: print "* Renaming old journal files using base name %s" % self._opts.obfn
+ if self._opts.obfn != None and self._opts.vflag:
+ print "* Renaming old journal files using base name %s" % self._opts.obfn
# .jdat files
- for fn in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
- tbfn = os.path.basename(fn)
+ for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
+ tbfn = os.path.basename(fname)
if self._opts.obfn != None:
- i1 = tbfn.rfind(".")
- if i1 >= 0:
- i2 = tbfn.rfind(".", 0, i1)
- if i2 >= 0:
- tbfn = "%s%s" % (self._opts.obfn, tbfn[i2:])
- os.rename(fn, os.path.join(targetDir, tbfn))
+ per1 = tbfn.rfind(".")
+ if per1 >= 0:
+ per2 = tbfn.rfind(".", 0, per1)
+ if per2 >= 0:
+ tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:])
+ os.rename(fname, os.path.join(target_dir, tbfn))
# .jinf file
- self._jrnlInfo.write(targetDir, self._opts.obfn)
+ self._jrnl_info.write(target_dir, self._opts.obfn)
os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
- def _printOptions(self):
+ def _print_options(self):
+ """Print program options"""
if self._opts.vflag:
print "Journal dir: %s" % self._jdir
print "Options: Base filename: %s" % self._opts.bfn
@@ -247,60 +287,68 @@
print " Verbose flag: %s" % True
print
- def _processArgs(self, argv):
- op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
- op.add_option("-b", "--base-filename",
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
action="store", dest="bfn", default="JournalData",
help="Base filename for old journal files")
- op.add_option("-B", "--new-base-filename",
+ opt.add_option("-B", "--new-base-filename",
action="store", dest="nbfn",
help="Base filename for new journal files")
- op.add_option("-n", "--no-pushdown",
+ opt.add_option("-n", "--no-pushdown",
action="store_true", dest="npd",
help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
- op.add_option("-N", "--num-jfiles",
+ opt.add_option("-N", "--num-jfiles",
action="store", type="int", dest="njf", default=8,
help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
- op.add_option("-o", "--old-base-filename",
+ opt.add_option("-o", "--old-base-filename",
action="store", dest="obfn",
help="Base filename for old journal files")
- op.add_option("-q", "--quiet",
+ opt.add_option("-q", "--quiet",
action="store_true", dest="qflag",
help="Quiet (suppress all non-error output)")
- op.add_option("-r", "--records",
+ opt.add_option("-r", "--records",
action="store_true", dest="rflag",
help="Print remaining records and transactions")
- op.add_option("-s", "--jfile-size-pgs",
+ opt.add_option("-s", "--jfile-size-pgs",
action="store", type="int", dest="jfs", default=24,
- help="Size of each new journal file in 64kiB blocks (%d-%d)" % (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
- op.add_option("-v", "--verbose",
+ help="Size of each new journal file in 64kiB blocks (%d-%d)" %
+ (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ opt.add_option("-v", "--verbose",
action="store_true", dest="vflag",
help="Verbose output")
- (self._opts, args) = op.parse_args()
+ (self._opts, args) = opt.parse_args()
if len(args) == 0:
- op.error("No journal directory argument")
+ opt.error("No journal directory argument")
elif len(args) > 1:
- op.error("Too many positional arguments: %s" % args)
+ opt.error("Too many positional arguments: %s" % args)
if self._opts.qflag and self._opts.rflag:
- op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
if self._opts.qflag and self._opts.vflag:
- op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
- op.error("Number of files (%d) is out of range (%d-%d)" % (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
- if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
- op.error("File size (%d) is out of range (%d-%d)" % (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ opt.error("Number of files (%d) is out of range (%d-%d)" %
+ (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
+ self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+ opt.error("File size (%d) is out of range (%d-%d)" %
+ (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
- op.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and (-o/--old-base-filename) must be used.")
+ opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and"
+ " (-o/--old-base-filename) must be used.")
self._jdir = args[0]
if not os.path.exists(self._jdir):
- op.error("Journal path \"%s\" does not exist" % self._jdir)
- self._printOptions()
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+ self._print_options()
#==============================================================================
# main program
#==============================================================================
if __name__ == "__main__":
- r = Resize(sys.argv);
- try: r.run()
- except Exception, e: sys.exit(e)
+ R = Resize()
+ try:
+ R.run()
+ except Exception, e:
+ sys.exit(e)
Modified: store/trunk/cpp/tools/store_chk
===================================================================
--- store/trunk/cpp/tools/store_chk 2010-04-26 18:31:43 UTC (rev 3930)
+++ store/trunk/cpp/tools/store_chk 2010-04-26 19:57:05 UTC (rev 3931)
@@ -1,139 +1,150 @@
#!/usr/bin/env python
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
-# Copyright (c) 2007, 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
+This file is part of the Qpid async store library msgstore.so.
-import jrnl
-import optparse, os, sys, xml.parsers.expat
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
-#== class Main ================================================================
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
-class Main(object):
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr, jrnl, janal
+import optparse, os, sys
+
+
+#== class StoreChk ============================================================
+
+class StoreChk(object):
"""
This class:
1. Reads a journal jinf file, and from its info:
2. Analyzes the journal data files to determine which is the last to be written, then
3. Reads and analyzes all the records in the journal files.
- The only public method is run() which kicks off the analysis.
+ The only public method is run() which kicks off the analysis.
"""
- def __init__(self, args):
+ def __init__(self):
+ """Constructor"""
# params
- self._opts = None
+ self.opts = None
+
self._jdir = None
# recovery analysis objects
- self._jrnlInfo = None
- self._jrnlRdr = None
+# self._jrnl_info = None
+# self.jrnl_rdr = None
- self._processArgs(args)
- self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn)
# FIXME: This is a hack... find an elegant way of getting the file size to jrec!
- jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
- self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
- self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag, self._opts.rflag, self._opts.vflag)
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info)
+ self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag,
+ self.opts.vflag)
def run(self):
- if not self._opts.qflag:
- print self._jrnlInfo
- print self._jrnlAnal
- self._jrnlRdr.run()
+ """Run the store check"""
+ if not self.opts.qflag:
+ print self._jrnl_info
+ print self.jrnl_anal
+ self.jrnl_rdr.run()
self._report()
def _report(self):
- if not self._opts.qflag:
+ """Print the results of the store check"""
+ if not self.opts.qflag:
print
print " === REPORT ===="
print
- print "Records: %8d non-transactional" % (self._jrnlRdr.msgCnt() - self._jrnlRdr.txnMsgCnt())
- print " %8d transactional" % self._jrnlRdr.txnMsgCnt()
- print " %8d total" % self._jrnlRdr.msgCnt()
+ print "Records: %8d non-transactional" % \
+ (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt())
+ print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt()
+ print " %8d total" % self.jrnl_rdr.get_msg_cnt()
print
- print "Transactions: %8d aborts" % self._jrnlRdr.abortCnt()
- print " %8d commits" % self._jrnlRdr.commitCnt()
- print " %8d total" % (self._jrnlRdr.abortCnt() + self._jrnlRdr.commitCnt())
+ print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt()
+ print " %8d commits" % self.jrnl_rdr.get_commit_cnt()
+ print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt())
print
- if self._jrnlRdr.emap().size() > 0:
+ if self.jrnl_rdr.emap().size() > 0:
print "Remaining enqueued records (sorted by rid): "
- ridList = self._jrnlRdr.emap().rids()
- ridList.sort()
- for rid in ridList:
- tup = self._jrnlRdr.emap().get(rid)
+ rid_list = self.jrnl_rdr.emap().rids()
+ rid_list.sort()
+ for rid in rid_list:
+ tup = self.jrnl_rdr.emap().get(rid)
locked = ""
if tup[2]:
locked += " (locked)"
print " fid=%d %s%s" % (tup[0], tup[1], locked)
- print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self._jrnlRdr.emap().size()
+ print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size()
else:
print "No remaining enqueued records found (emap empty)."
print
- if self._jrnlRdr.tmap().size() > 0:
- txnRecCnt = 0
+ if self.jrnl_rdr.tmap().size() > 0:
+ txn_rec_cnt = 0
print "Incomplete transactions: "
- for xid in self._jrnlRdr.tmap().xids():
- jrnl.Utils.formatXid(xid)
- recs = self._jrnlRdr.tmap().get(xid)
+ for xid in self.jrnl_rdr.tmap().xids():
+ jrnl.Utils.format_xid(xid)
+ recs = self.jrnl_rdr.tmap().get(xid)
for tup in recs:
print " fid=%d %s" % (tup[0], tup[1])
- print " Total: %d records for %s" % (len(recs), jrnl.Utils.formatXid(xid))
+ print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid))
print
- txnRecCnt += len(recs)
- print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % (self._jrnlRdr.tmap().size(), txnRecCnt)
+ txn_rec_cnt += len(recs)
+ print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \
+ (self.jrnl_rdr.tmap().size(), txn_rec_cnt)
else:
print "No incomplete transactions found (tmap empty)."
print
- print "%d enqueues, %d journal records processed." % (self._jrnlRdr.msgCnt(), self._jrnlRdr.recCnt())
+ print "%d enqueues, %d journal records processed." % \
+ (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt())
- def _processArgs(self, argv):
- op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
- op.add_option("-b", "--base-filename",
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
action="store", dest="bfn", default="JournalData",
help="Base filename for old journal files")
- op.add_option("-q", "--quiet",
+ opt.add_option("-q", "--quiet",
action="store_true", dest="qflag",
help="Quiet (suppress all non-error output)")
- op.add_option("-r", "--records",
+ opt.add_option("-r", "--records",
action="store_true", dest="rflag",
help="Print all records and transactions (including consumed/closed)")
- op.add_option("-v", "--verbose",
+ opt.add_option("-v", "--verbose",
action="store_true", dest="vflag",
help="Verbose output")
- (self._opts, args) = op.parse_args()
+ (self.opts, args) = opt.parse_args()
if len(args) == 0:
- op.error("No journal directory argument")
+ opt.error("No journal directory argument")
elif len(args) > 1:
- op.error("Too many positional arguments: %s" % args)
- if self._opts.qflag and self._opts.rflag:
- op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
- if self._opts.qflag and self._opts.vflag:
- op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
self._jdir = args[0]
if not os.path.exists(self._jdir):
- op.error("Journal path \"%s\" does not exist" % self._jdir)
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
-#== class CsvMain =============================================================
+#== class CsvStoreChk =========================================================
-class CsvMain(Main):
+class CsvStoreChk(StoreChk):
"""
This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
@@ -152,150 +163,174 @@
EXTERN_COL = 13
COMMENT_COL = 20
- def __init__(self, args):
+ def __init__(self):
+ """Constructor"""
+ StoreChk.__init__(self)
+
# csv params
- self._numMsgs = None
- self._msgLen = None
- self._autoDeq = None
- self._xidLen = None
- self._transient = None
- self._extern = None
+ self.num_msgs = None
+ self.msg_len = None
+ self.auto_deq = None
+ self.xid_len = None
+ self.transient = None
+ self.extern = None
self._warning = []
- Main.__init__(self, args)
- self._jrnlRdr.setCallbacks(self, CsvMain._csvPreRunChk, CsvMain._csvEnqChk, CsvMain._csvDeqChk, CsvMain._csvTxnChk, CsvMain._csvPostRunChk)
- self._getCsvTest()
+ self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk,
+ CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk)
+ self._get_csv_test()
- def _getCsvTest(self):
- if self._opts.csvfn != None and self._opts.tnum != None:
- tparams = self._readCsvFile(self._opts.csvfn, self._opts.tnum)
+ def _get_csv_test(self):
+ """Get a test from the CSV reader"""
+ if self.opts.csvfn != None and self.opts.tnum != None:
+ tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum)
if tparams == None:
- print "ERROR: Test %d not found in CSV file \"%s\"" % (self._opts.tnum, self._opts.csvfn)
+ print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn)
sys.exit(1)
- self._numMsgs = tparams["num_msgs"]
+ self.num_msgs = tparams["num_msgs"]
if tparams["min_size"] == tparams["max_size"]:
- self._msgLen = tparams["max_size"]
+ self.msg_len = tparams["max_size"]
else:
- self._msgLen = 0
- self._autoDeq = tparams["auto_deq"]
+ self.msg_len = 0
+ self.auto_deq = tparams["auto_deq"]
if tparams["xid_min_size"] == tparams["xid_max_size"]:
- self._xidLen = tparams["xid_max_size"]
+ self.xid_len = tparams["xid_max_size"]
else:
- self._xidLen = 0
- self._transient = tparams["transient"]
- self._extern = tparams["extern"]
+ self.xid_len = 0
+ self.transient = tparams["transient"]
+ self.extern = tparams["extern"]
- def _readCsvFile(self, filename, tnum):
+ def _read_csv_file(self, filename, tnum):
+ """Read the CSV test parameter file"""
try:
- f=open(filename, "r")
+ csvf = open(filename, "r")
except IOError:
print "ERROR: Unable to open CSV file \"%s\"" % filename
sys.exit(1)
- for l in f:
- sl = l.strip().split(",")
- if len(sl[0]) > 0 and sl[0][0] != "\"":
+ for line in csvf:
+ str_list = line.strip().split(",")
+ if len(str_list[0]) > 0 and str_list[0][0] != "\"":
try:
- if (int(sl[self.TEST_NUM_COL]) == tnum):
- return { "num_msgs":int(sl[self.NUM_MSGS_COL]),
- "min_size":int(sl[self.MIN_MSG_SIZE_COL]),
- "max_size":int(sl[self.MAX_MSG_SIZE_COL]),
- "auto_deq":not (sl[self.AUTO_DEQ_COL] == "FALSE" or sl[self.AUTO_DEQ_COL] == "0"),
- "xid_min_size":int(sl[self.MIN_XID_SIZE_COL]),
- "xid_max_size":int(sl[self.MAX_XID_SIZE_COL]),
- "transient":not (sl[self.TRANSIENT_COL] == "FALSE" or sl[self.TRANSIENT_COL] == "0"),
- "extern":not (sl[self.EXTERN_COL] == "FALSE" or sl[self.EXTERN_COL] == "0"),
- "comment":sl[self.COMMENT_COL] }
+ if (int(str_list[self.TEST_NUM_COL]) == tnum):
+ return { "num_msgs": int(str_list[self.NUM_MSGS_COL]),
+ "min_size": int(str_list[self.MIN_MSG_SIZE_COL]),
+ "max_size": int(str_list[self.MAX_MSG_SIZE_COL]),
+ "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or
+ str_list[self.AUTO_DEQ_COL] == "0"),
+ "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]),
+ "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]),
+ "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or
+ str_list[self.TRANSIENT_COL] == "0"),
+ "extern": not (str_list[self.EXTERN_COL] == "FALSE" or
+ str_list[self.EXTERN_COL] == "0"),
+ "comment": str_list[self.COMMENT_COL] }
except Exception:
pass
return None
- def _processArgs(self, argv):
- op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
- op.add_option("-b", "--base-filename",
+ def _process_args(self):
+ """Process command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
action="store", dest="bfn", default="JournalData",
help="Base filename for old journal files")
- op.add_option("-c", "--csv-filename",
+ opt.add_option("-c", "--csv-filename",
action="store", dest="csvfn",
help="CSV filename containing test parameters")
- op.add_option("-q", "--quiet",
+ opt.add_option("-q", "--quiet",
action="store_true", dest="qflag",
help="Quiet (suppress all non-error output)")
- op.add_option("-r", "--records",
+ opt.add_option("-r", "--records",
action="store_true", dest="rflag",
help="Print all records and transactions (including consumed/closed)")
- op.add_option("-t", "--test-num",
+ opt.add_option("-t", "--test-num",
action="store", type="int", dest="tnum",
help="Test number from CSV file - only valid if CSV file named")
- op.add_option("-v", "--verbose",
+ opt.add_option("-v", "--verbose",
action="store_true", dest="vflag",
help="Verbose output")
- (self._opts, args) = op.parse_args()
+ (self.opts, args) = opt.parse_args()
if len(args) == 0:
- op.error("No journal directory argument")
+ opt.error("No journal directory argument")
elif len(args) > 1:
- op.error("Too many positional arguments: %s" % args)
- if self._opts.qflag and self._opts.rflag:
- op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
- if self._opts.qflag and self._opts.vflag:
- op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
self._jdir = args[0]
if not os.path.exists(self._jdir):
- op.error("Journal path \"%s\" does not exist" % self._jdir)
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
# Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
#@staticmethod
- def _csvPreRunChk(csvStoreChk):
- if csvStoreChk._numMsgs == None: return
- if csvStoreChk._jrnlAnal.isEmpty() and csvStoreChk._numMsgs > 0:
- raise Exception("[CSV %d] All journal files are empty, but test expects %d msg(s)." % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs))
+ def _csv_pre_run_chk(csv_store_chk):
+ """Check performed before a test runs"""
+ if csv_store_chk.num_msgs == None:
+ return
+ if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0:
+ raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs)
return False
- _csvPreRunChk = staticmethod(_csvPreRunChk)
+ _csv_pre_run_chk = staticmethod(_csv_pre_run_chk)
#@staticmethod
- def _csvEnqChk(csvStoreChk, hdr):
- #if csvStoreChk._numMsgs == None: return
+ def _csv_enq_chk(csv_store_chk, hdr):
+ """Check performed before each enqueue operation"""
+ #if csv_store_chk.num_msgs == None: return
#
- if csvStoreChk._extern != None:
- if csvStoreChk._extern != hdr.extern:
- raise Exception("[CSV %d] External flag mismatch: found extern=%s; expected %s" % (csvStoreChk._opts.tnum, hdr.extern, csvStoreChk._extern))
+ if csv_store_chk.extern != None:
+ if csv_store_chk.extern != hdr.extern:
+ raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern)
if hdr.extern and hdr.data != None:
- raise Exception("[CSV %d] Message data found on record with external flag set" % csvStoreChk._opts.tnum)
- if csvStoreChk._msgLen != None and csvStoreChk._msgLen > 0 and hdr.data != None and len(hdr.data) != csvStoreChk._msgLen:
- raise Exception("[CSV %d] Message length mismatch: found %d; expected %d" % (csvStoreChk._opts.tnum, len(hdr.data), csvStoreChk._msgLen))
- if csvStoreChk._xidLen != None and csvStoreChk._xidLen > 0 and len(hdr.xid) != csvStoreChk._xidLen:
- raise Exception("[CSV %d] Message XID mismatch: found %d; expected %d" % (csvStoreChk._opts.tnum, len(hdr.xid), csvStoreChk._xidLen))
- if csvStoreChk._transient != None and hdr.transient != csvStoreChk._transient:
- raise Exception("[CSV %d] Transience mismatch: found trans=%s; expected %s" % (csvStoreChk._opts.tnum, hdr.transient, csvStoreChk._transient))
+ raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum)
+ if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \
+ len(hdr.data) != csv_store_chk.msg_len:
+ raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data))
+ if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len:
+ raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid))
+ if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient:
+ raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient)
return False
- _csvEnqChk = staticmethod(_csvEnqChk)
+ _csv_enq_chk = staticmethod(_csv_enq_chk)
#@staticmethod
- def _csvDeqChk(csvStoreChk, hdr):
- if csvStoreChk._autoDeq != None and not csvStoreChk._autoDeq:
- self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % (csvStoreChk._opts.tnum, hdr.rid))
+ def _csv_deq_chk(csv_store_chk, hdr):
+ """Check performed before each dequeue operation"""
+ if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq:
+ raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ (csv_store_chk.opts.tnum, hdr.rid))
+ #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ # (csv_store_chk.opts.tnum, hdr.rid))
return False
- _csvDeqChk = staticmethod(_csvDeqChk)
+ _csv_deq_chk = staticmethod(_csv_deq_chk)
#@staticmethod
- def _csvTxnChk(csvStoreChk, hdr):
+ def _csv_txn_chk(csv_store_chk, hdr):
+ """Check performed before each transaction commit/abort"""
return False
- _csvTxnChk = staticmethod(_csvTxnChk)
+ _csv_txn_chk = staticmethod(_csv_txn_chk)
#@staticmethod
- def _csvPostRunChk(csvStoreChk):
- # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because of journal overwriting
- if csvStoreChk._numMsgs != None and not csvStoreChk._jrnlRdr._lastFileFlag and csvStoreChk._numMsgs != csvStoreChk._jrnlRdr.msgCnt():
- raise Exception("[CSV %s] Incorrect number of messages: Expected %d, found %d" % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs, csvStoreChk._jrnlRdr.msgCnt()))
+ def _csv_post_run_chk(csv_store_chk):
+ """Cehck performed after the completion of the test"""
+ # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because
+ # of journal overwriting
+ if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \
+ csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt():
+ raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs,
+ csv_store_chk.jrnl_rdr.get_msg_cnt())
return False
- _csvPostRunChk = staticmethod(_csvPostRunChk)
+ _csv_post_run_chk = staticmethod(_csv_post_run_chk)
#==============================================================================
# main program
#==============================================================================
if __name__ == "__main__":
- m = CsvMain(sys.argv)
- try: m.run()
- except Exception, e: sys.exit(e)
+ M = CsvStoreChk()
+ try:
+ M.run()
+ except Exception, e:
+ sys.exit(e)
14 years, 8 months
rhmessaging commits: r3930 - store/trunk/cpp/tests/cluster.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2010-04-26 14:31:43 -0400 (Mon, 26 Apr 2010)
New Revision: 3930
Modified:
store/trunk/cpp/tests/cluster/cluster_tests_env.sh
Log:
Added missing env variable QPID_PYTHON_TEST for to test environment for ptolemy
Modified: store/trunk/cpp/tests/cluster/cluster_tests_env.sh
===================================================================
--- store/trunk/cpp/tests/cluster/cluster_tests_env.sh 2010-04-26 12:29:16 UTC (rev 3929)
+++ store/trunk/cpp/tests/cluster/cluster_tests_env.sh 2010-04-26 18:31:43 UTC (rev 3930)
@@ -167,6 +167,7 @@
export QPID_CLUSTER_EXEC="${QPID_BIN_DIR}/qpid-cluster"
export RECEIVER_EXEC="${QPID_LIBEXEC_DIR}/qpid/tests/receiver"
export SENDER_EXEC="${QPID_LIBEXEC_DIR}/qpid/tests/sender"
+ export QPID_PYTHON_TEST=${QPID_BIN_DIR}/qpid-python-test
# Data
CLUSTER_TESTS_FAIL="${QPID_LIBEXEC_DIR}/qpid/tests/cluster_tests.fail"
@@ -290,7 +291,7 @@
# Check expected environment vars are set
func_checkpaths PYTHON_DIR PYTHONPATH TMP_DATA_DIR
func_checklibs CLUSTER_LIB TEST_STORE_LIB STORE_LIB STORE_LIB
-func_checkexecs CPP_CLUSTER_EXEC PYTHON_CLUSTER_EXEC QPIDD_EXEC QPID_CONFIG_EXEC QPID_ROUTE_EXEC RECEIVER_EXEC SENDER_EXEC
+func_checkexecs CPP_CLUSTER_EXEC PYTHON_CLUSTER_EXEC QPIDD_EXEC QPID_CONFIG_EXEC QPID_ROUTE_EXEC RECEIVER_EXEC SENDER_EXEC QPID_PYTHON_TEST
FAILING_PYTHON_TESTS="${abs_srcdir}/../failing_python_tests.txt"
if test -z $1; then
14 years, 8 months
rhmessaging commits: r3929 - mgmt/newdata/rosemary/xml.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-04-26 08:29:16 -0400 (Mon, 26 Apr 2010)
New Revision: 3929
Modified:
mgmt/newdata/rosemary/xml/rosemary.xml
Log:
Added some SysImage entries
Modified: mgmt/newdata/rosemary/xml/rosemary.xml
===================================================================
--- mgmt/newdata/rosemary/xml/rosemary.xml 2010-04-26 12:27:52 UTC (rev 3928)
+++ mgmt/newdata/rosemary/xml/rosemary.xml 2010-04-26 12:29:16 UTC (rev 3929)
@@ -153,5 +153,13 @@
</package>
<package name="com.redhat.sesame">
+ <class name="Sysimage">
+ <property name="nodeName">
+ <title>Host</title>
+ </property>
+ <statistic name="loadAverage1Min">
+ <title>Load Average 1 Minute</title>
+ </statistic>
+ </class>
</package>
</model>
14 years, 8 months
rhmessaging commits: r3928 - mgmt/newdata/wooly/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-04-26 08:27:52 -0400 (Mon, 26 Apr 2010)
New Revision: 3928
Modified:
mgmt/newdata/wooly/python/wooly/forms.py
Log:
Don't show the "Show more options" button on forms if there are no fields added to the extra_fields child.
Modified: mgmt/newdata/wooly/python/wooly/forms.py
===================================================================
--- mgmt/newdata/wooly/python/wooly/forms.py 2010-04-23 21:01:54 UTC (rev 3927)
+++ mgmt/newdata/wooly/python/wooly/forms.py 2010-04-26 12:27:52 UTC (rev 3928)
@@ -273,9 +273,9 @@
return "disabled=\"disabled\""
class FormInputItem(object):
- def __init__(self, value):
+ def __init__(self, value, title=None):
self.value = value
- self.title = None
+ self.title = title
self.description = None
self.disabled = False
@@ -389,6 +389,10 @@
def __init__(self, app, name):
super(ShowableFieldSet, self).__init__(app, name)
+ def render(self, session):
+ return len(self.fields) and \
+ super(ShowableFieldSet, self).render(session) or ""
+
class ScalarField(FormField):
def __init__(self, app, name, input):
super(ScalarField, self).__init__(app, name)
@@ -527,7 +531,7 @@
def do_get_items(self, session):
raise Exception("Not implemented")
-
+
class Inputs(CheckboxItemSet):
def do_get_items(self, session):
return self.parent.do_get_items(session)
14 years, 8 months
rhmessaging commits: r3927 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2010-04-23 17:01:54 -0400 (Fri, 23 Apr 2010)
New Revision: 3927
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/StorePlugin.cpp
Log:
Use cleaned-up version of ManagementAgent::addObject for durable objects.
Moved initManagement into the early initialization.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-04-23 19:34:08 UTC (rev 3926)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-04-23 21:01:54 UTC (rev 3927)
@@ -94,7 +94,7 @@
_mgmtObject->set_writePageSize(0);
_mgmtObject->set_writePages(0);
- _agent->addObject(_mgmtObject);
+ _agent->addObject(_mgmtObject, 0, true);
}
log(LOG_NOTICE, "Created");
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-04-23 19:34:08 UTC (rev 3926)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-04-23 21:01:54 UTC (rev 3927)
@@ -233,7 +233,7 @@
mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
- agent->addObject(mgmtObject, 0x1000000000000050LL);
+ agent->addObject(mgmtObject, 0, true);
}
}
}
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2010-04-23 19:34:08 UTC (rev 3926)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2010-04-23 21:01:54 UTC (rev 3927)
@@ -57,13 +57,12 @@
static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->init(&options);
broker->setStore (store);
target.addFinalizer(boost::bind(&StorePlugin::finalize, this));
+ static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->initManagement(broker);
}
- void initialize(Plugin::Target& target)
+ void initialize(Plugin::Target&)
{
- Broker* broker = dynamic_cast<Broker*>(&target);
- MessageStore* sp = store.get();
- static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->initManagement(broker);
+ // This function intentionally left blank
}
void finalize()
14 years, 8 months
rhmessaging commits: r3926 - store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2010-04-23 15:34:08 -0400 (Fri, 23 Apr 2010)
New Revision: 3926
Modified:
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.h
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.h
Log:
Updated generated qmf files.
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp 2010-04-23 18:36:37 UTC (rev 3925)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp 2010-04-23 19:34:08 UTC (rev 3926)
@@ -30,6 +30,7 @@
#include "ArgsJournalExpand.h"
#include <iostream>
+#include <sstream>
using namespace qmf::com::redhat::rhm::store;
using qpid::management::ManagementAgent;
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.h 2010-04-23 18:36:37 UTC (rev 3925)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.h 2010-04-23 19:34:08 UTC (rev 3926)
@@ -58,7 +58,7 @@
uint32_t readPageSize;
uint32_t readPages;
uint16_t initialFileCount;
- uint8_t autoExpand;
+ bool autoExpand;
uint16_t currentFileCount;
uint16_t maxFileCount;
uint32_t dataFileSize;
@@ -246,12 +246,12 @@
::qpid::management::Mutex::ScopedLock mutex(accessLock);
return initialFileCount;
}
- inline void set_autoExpand (uint8_t val) {
+ inline void set_autoExpand (bool val) {
::qpid::management::Mutex::ScopedLock mutex(accessLock);
autoExpand = val;
configChanged = true;
}
- inline uint8_t get_autoExpand() {
+ inline bool get_autoExpand() {
::qpid::management::Mutex::ScopedLock mutex(accessLock);
return autoExpand;
}
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp 2010-04-23 18:36:37 UTC (rev 3925)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp 2010-04-23 19:34:08 UTC (rev 3926)
@@ -29,6 +29,7 @@
#include "Store.h"
#include <iostream>
+#include <sstream>
using namespace qmf::com::redhat::rhm::store;
using qpid::management::ManagementAgent;
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.h 2010-04-23 18:36:37 UTC (rev 3925)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.h 2010-04-23 19:34:08 UTC (rev 3926)
@@ -53,7 +53,7 @@
std::string location;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
- uint8_t tplIsInitialized;
+ bool tplIsInitialized;
std::string tplDirectory;
uint32_t tplWritePageSize;
uint32_t tplWritePages;
@@ -169,12 +169,12 @@
::qpid::management::Mutex::ScopedLock mutex(accessLock);
return defaultDataFileSize;
}
- inline void set_tplIsInitialized (uint8_t val) {
+ inline void set_tplIsInitialized (bool val) {
::qpid::management::Mutex::ScopedLock mutex(accessLock);
tplIsInitialized = val;
configChanged = true;
}
- inline uint8_t get_tplIsInitialized() {
+ inline bool get_tplIsInitialized() {
::qpid::management::Mutex::ScopedLock mutex(accessLock);
return tplIsInitialized;
}
14 years, 8 months
rhmessaging commits: r3925 - in mgmt/newdata: mint and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-04-23 14:36:37 -0400 (Fri, 23 Apr 2010)
New Revision: 3925
Removed:
mgmt/newdata/mint/python/mint/update.py
Modified:
mgmt/newdata/cumin/python/cumin/objectframe.py
mgmt/newdata/mint/Makefile
mgmt/newdata/mint/python/mint/database.py
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/main.py
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/newupdate.py
mgmt/newdata/mint/python/mint/tools.py
mgmt/newdata/mint/python/mint/util.py
mgmt/newdata/mint/python/mint/vacuum.py
mgmt/newdata/rosemary/python/rosemary/model.py
Log:
* Remove old update framework
* Make --log-level also affect --debug mode in mint tools
* Adapt to v1.1 qmf console changes; a problem with oid reference
values remains
* Partial adaptation of mint-admin functions away from sqlobject
* Fix mint-bench reporting
* Drop ObjectId and AgentId adapter classes
* Remove unused schema-related makefile rules from mint
* Tweak context-path separator styling
Modified: mgmt/newdata/cumin/python/cumin/objectframe.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectframe.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/cumin/python/cumin/objectframe.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -190,7 +190,7 @@
links.append(self.link.render(session, frame))
- return " > ".join(reversed(links))
+ return " › ".join(reversed(links))
class ObjectViewContextLink(Link):
def __init__(self, app, name):
Modified: mgmt/newdata/mint/Makefile
===================================================================
--- mgmt/newdata/mint/Makefile 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/Makefile 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,4 +1,4 @@
-.PHONY: build install clean schema schema-clean
+.PHONY: build install clean
include ../etc/Makefile.common
@@ -26,14 +26,4 @@
install -d ${etc}
install -pm 0644 etc/mint-vacuumdb.cron ${etc}
-schema: schema-clean
- $(MAKE) schema -C xml
- $(MAKE) schema -C python/mint
- $(MAKE) schema -C sql
-
-schema-clean:
- $(MAKE) clean -C xml
- $(MAKE) clean -C python/mint
- $(MAKE) clean -C sql
-
clean: clean-python-files
Modified: mgmt/newdata/mint/python/mint/database.py
===================================================================
--- mgmt/newdata/mint/python/mint/database.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/database.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,10 +1,8 @@
-import logging
-import os.path
-
from psycopg2 import ProgrammingError
from sqlobject import connectionForURI, sqlhub
from model import MintInfo, Role
+from util import *
log = logging.getLogger("mint.database")
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,7 +1,6 @@
from newupdate import *
from schema import *
from sql import *
-from update import *
from util import *
import mint
@@ -43,12 +42,9 @@
if self.stop_requested:
break
- up = ExpireUpdate()
+ up = ExpireUpdate(self.app.model)
self.app.update_thread.enqueue(up)
- up = NewExpireUpdate(self.app.model)
- self.app.new_update_thread.enqueue(up)
-
sleep(frequency)
def __convertTimeUnits(self, t):
@@ -68,28 +64,6 @@
class ExpireUpdate(Update):
def do_process(self, conn, stats):
- attrs = self.thread.app.expire_thread.attrs
-
- cursor = conn.cursor()
- total = 0
-
- for op in self.thread.app.expire_thread.ops:
- log.debug("Running expire op %s", op)
-
- count = op.execute(cursor, attrs)
-
- conn.commit()
-
- log.debug("%i records expired", count)
-
- total += count
-
- log.debug("%i total records expired", total)
-
- stats.expired += 1
-
-class NewExpireUpdate(NewUpdate):
- def do_process(self, conn, stats):
seconds = self.model.app.expire_threshold
log.info("Expiring samples older than %i seconds", seconds)
Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/main.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,8 +1,7 @@
from database import MintDatabase
from expire import ExpireThread
from model import MintModel
-from newupdate import NewUpdateThread
-from update import UpdateThread
+from newupdate import UpdateThread
from vacuum import VacuumThread
from util import *
@@ -19,7 +18,6 @@
self.update_enabled = True
self.update_thread = UpdateThread(self)
- self.new_update_thread = NewUpdateThread(self)
self.expire_enabled = True
self.expire_frequency = self.config.expire_frequency
@@ -44,7 +42,6 @@
log.info("Expiration is %s", state(self.expire_enabled))
self.update_thread.init()
- self.new_update_thread.init()
self.expire_thread.init()
self.vacuum_thread.init()
@@ -52,8 +49,7 @@
self.model.start()
if self.update_enabled:
- # XXX self.update_thread.start()
- self.new_update_thread.start()
+ self.update_thread.start()
if self.expire_enabled:
self.expire_thread.start()
@@ -66,7 +62,6 @@
if self.update_enabled:
self.update_thread.stop()
- self.new_update_thread.stop()
if self.expire_enabled:
self.expire_thread.stop()
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/model.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -5,7 +5,6 @@
from newupdate import *
from schema import *
from schemalocal import *
-from update import *
from util import *
import mint.schema
@@ -23,7 +22,7 @@
mint.schema.model = self
self.rosemary = RosemaryModel()
- self.rosemary.sql_logging_enabled = True
+ self.rosemary.sql_logging_enabled = False
self.qmf_session = None
self.qmf_brokers = list()
@@ -54,8 +53,9 @@
# have left behind in the DB; it's basically an unconstrained
# agent disconnect update, for any agent
- up = AgentDisconnectUpdate(None)
- self.app.update_thread.enqueue(up)
+ # XXX
+ #up = AgentDisconnectUpdate(None)
+ #self.app.update_thread.enqueue(up)
uris = [x.strip() for x in self.app.config.qmf.split(",")]
@@ -63,25 +63,34 @@
self.add_broker(uri)
def do_stop(self):
- for qbroker in self.qmf_brokers:
- self.qmf_session.delBroker(qbroker)
+ for qmf_broker in self.qmf_brokers:
+ self.qmf_session.delBroker(qmf_broker)
def add_broker(self, url):
log.info("Adding qmf broker at %s", url)
self.lock.acquire()
try:
- qbroker = self.qmf_session.addBroker(url)
- self.qmf_brokers.append(qbroker)
+ qmf_broker = self.qmf_session.addBroker(url)
+ self.qmf_brokers.append(qmf_broker)
finally:
self.lock.release()
+ def get_agent(self, qmf_agent):
+ id = qmf_agent.getAgentBank()
+
+ self.lock.acquire()
+ try:
+ return self.agents[id]
+ finally:
+ self.lock.release()
+
class MintAgent(object):
def __init__(self, model, qmf_agent):
self.model = model
self.qmf_agent = qmf_agent
- self.id = str(QmfAgentId.fromAgent(self.qmf_agent))
+ self.id = qmf_agent.getAgentBank()
self.last_heartbeat = None
@@ -105,8 +114,12 @@
assert isinstance(obj, RosemaryObject)
class_key = ClassKey(obj._qmf_class_key)
- oid = QmfObjectId.fromString(obj._qmf_object_id).toObjectId()
+ oid_args = {"_agent_name": obj._qmf_agent_id,
+ "_object_name": obj._qmf_object_id}
+
+ oid = ObjectId(oid_args)
+
self.model.lock.acquire()
try:
broker = self.qmf_agent.getBroker()
@@ -137,63 +150,41 @@
def __init__(self, model):
self.model = model
- self.deferred_object_prop_calls = defaultdict(list)
- self.deferred_object_stat_calls = defaultdict(list)
+ def brokerConnected(self, qmf_broker):
+ log.info("Broker at %s:%i is connected",
+ qmf_broker.host, qmf_broker.port)
- def brokerConnected(self, qbroker):
- log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+ def brokerInfo(self, qmf_broker):
+ log.info("Broker info from %s", qmf_broker)
- def brokerInfo(self, qbroker):
- # Now we have a brokerId to use to generate fully qualified agent
- # IDs
+ def brokerDisconnected(self, qmf_broker):
+ log.info("Broker at %s:%i is disconnected",
+ qmf_broker.host, qmf_broker.port)
- for qagent in qbroker.getAgents():
- MintAgent(self.model, qagent)
+ def newAgent(self, qmf_agent):
+ log.info("Creating %s", qmf_agent)
- def brokerDisconnected(self, qbroker):
- log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+ MintAgent(self.model, qmf_agent)
- def newAgent(self, qagent):
- log.info("Creating %s", qagent)
+ def delAgent(self, qmf_agent):
+ log.info("Deleting %s", qmf_agent)
- # Some agents come down without a brokerId, meaning we can't
- # generate a fully qualified agent ID for them. Those we
- # handle in brokerInfo.
+ try:
+ agent = self.model.get_agent(qmf_agent)
+ except KeyError:
+ return
- if qagent.getBroker().brokerId:
- agent = MintAgent(self.model, qagent)
-
- # XXX This business is to handle an agent-vs-agent-data ordering
- # problem
-
- objectPropCalls = self.deferred_object_prop_calls[agent.id]
-
- for broker, object in objectPropCalls:
- self.objectProps(broker, object)
-
- objectStatCalls = self.deferred_object_stat_calls[agent.id]
-
- for broker, object in objectStatCalls:
- self.objectStats(broker, object)
-
- def delAgent(self, qagent):
- log.info("Deleting %s", qagent)
-
- id = str(QmfAgentId.fromAgent(qagent))
-
- agent = self.model.agents[id]
agent.delete()
- up = AgentDisconnectUpdate(agent)
- self.model.app.update_thread.enqueue(up)
+ if self.model.app.update_thread.isAlive():
+ up = AgentDelete(self.model, agent)
+ self.model.app.update_thread.enqueue(up)
- def heartbeat(self, qagent, timestamp):
+ def heartbeat(self, qmf_agent, timestamp):
timestamp = timestamp / 1000000000
- id = str(QmfAgentId.fromAgent(qagent))
-
try:
- agent = self.model.agents[id]
+ agent = self.model.get_agent(qmf_agent)
except KeyError:
return
@@ -208,58 +199,26 @@
# XXX I want to store class keys using this, but I can't,
# because I don't get any agent info; instead
- def objectProps(self, broker, object):
- self.model.lock.acquire()
- try:
- pass
- finally:
- self.model.lock.release()
+ def objectProps(self, broker, obj):
+ agent = self.model.get_agent(obj._agent)
- self.model.lock.acquire()
- try:
- id = str(QmfAgentId.fromObject(object))
-
- try:
- agent = self.model.agents[id]
- except KeyError:
- self.deferred_object_prop_calls[id].append((broker, object))
- return
- finally:
- self.model.lock.release()
-
if self.model.app.update_thread.isAlive():
- up = PropertyUpdate(agent, object)
- self.model.app.update_thread.enqueue(up)
-
- if self.model.app.new_update_thread.isAlive():
- if object.getTimestamps()[2]:
- up = NewObjectDelete(self.model, agent, object)
+ if obj.getTimestamps()[2]:
+ up = ObjectDelete(self.model, agent, obj)
else:
- up = NewObjectUpdate(self.model, agent, object)
+ up = ObjectUpdate(self.model, agent, obj)
- self.model.app.new_update_thread.enqueue(up)
+ self.model.app.update_thread.enqueue(up)
- def objectStats(self, broker, object):
- self.model.lock.acquire()
- try:
- id = str(QmfAgentId.fromObject(object))
+ def objectStats(self, broker, obj):
+ print "objectStats!", broker, obj
- try:
- agent = self.model.agents[id]
- except KeyError:
- self.deferred_object_stat_calls[id].append((broker, object))
- return
- finally:
- self.model.lock.release()
+ agent = self.get_agent(obj._agent)
if self.model.app.update_thread.isAlive():
- up = StatisticUpdate(agent, object)
+ up = ObjectAddSample(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
- if self.model.app.new_update_thread.isAlive():
- up = NewSampleUpdate(self.model, agent, object)
- self.model.app.new_update_thread.enqueue(up)
-
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
Modified: mgmt/newdata/mint/python/mint/newupdate.py
===================================================================
--- mgmt/newdata/mint/python/mint/newupdate.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/newupdate.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -6,9 +6,9 @@
log = logging.getLogger("mint.newupdate")
-class NewUpdateThread(MintDaemonThread):
+class UpdateThread(MintDaemonThread):
def __init__(self, app):
- super(NewUpdateThread, self).__init__(app)
+ super(UpdateThread, self).__init__(app)
self.conn = None
self.stats = None
@@ -19,7 +19,7 @@
def init(self):
self.conn = self.app.database.get_connection()
- self.stats = NewUpdateStats()
+ self.stats = UpdateStats()
def enqueue(self, update):
update.thread = self
@@ -50,7 +50,7 @@
update.process(self.conn, self.stats)
-class NewUpdateStats(object):
+class UpdateStats(object):
def __init__(self):
self.enqueued = 0
self.dequeued = 0
@@ -63,7 +63,7 @@
self.samples_expired = 0
self.samples_dropped = 0
-class NewUpdate(object):
+class Update(object):
def __init__(self, model):
self.model = model
@@ -83,7 +83,7 @@
conn.rollback()
- if self.model.app.new_update_thread.halt_on_error:
+ if self.model.app.update_thread.halt_on_error:
raise
def do_process(self, conn, stats):
@@ -92,58 +92,20 @@
def __repr__(self):
return self.__class__.__name__
-class NewObjectUpdate(NewUpdate):
- def __init__(self, model, agent, object):
- super(NewObjectUpdate, self).__init__(model)
+class ObjectUpdate(Update):
+ def __init__(self, model, agent, obj):
+ super(ObjectUpdate, self).__init__(model)
self.agent = agent
- self.object = object
- self.object_id = str(QmfObjectId.fromObject(object))
- self.session_id = None
+ self.object = obj
- sequence = object.getObjectId().getSequence()
-
- if sequence != 0:
- self.session_id = str(sequence)
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- self.update_time = datetime.fromtimestamp(update_time / 1000000000)
- self.create_time = datetime.fromtimestamp(create_time / 1000000000)
- self.delete_time = datetime.fromtimestamp(delete_time / 1000000000)
-
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
columns = list()
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- if obj._sync_time:
- # This object is already in the database
-
- obj._qmf_update_time = self.update_time
- columns.append(cls.sql_table._qmf_update_time)
-
- # XXX session_id may have changed too?
- else:
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = self.object_id
- obj._qmf_session_id = self.session_id
- obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_update_time = self.update_time
- obj._qmf_create_time = self.create_time
-
- columns.append(cls.sql_table._id)
- columns.append(cls.sql_table._qmf_agent_id)
- columns.append(cls.sql_table._qmf_object_id)
- columns.append(cls.sql_table._qmf_session_id)
- columns.append(cls.sql_table._qmf_class_key)
- columns.append(cls.sql_table._qmf_update_time)
- columns.append(cls.sql_table._qmf_create_time)
-
- self.process_references(obj, columns)
+ self.process_qmf_attributes(obj, columns)
self.process_properties(obj, columns)
cursor = conn.cursor()
@@ -166,7 +128,7 @@
raise PackageUnknown(name)
name = class_key.getClassName()
- name = name[0].upper() + name[1:]
+ name = name[0].upper() + name[1:] # /me shakes fist
try:
cls = pkg._classes_by_name[name]
@@ -198,63 +160,100 @@
return obj
- def process_references(self, obj, columns):
- for prop, value in self.object.getProperties():
- if prop.type != 10:
- continue
+ def process_qmf_attributes(self, obj, columns):
+ table = obj._class.sql_table
- try:
- ref = obj._class._references_by_name[prop.name]
- except KeyError:
- log.debug("Reference %s is unknown", prop.name)
+ update_time, create_time, delete_time = self.object.getTimestamps()
- continue
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+ create_time = datetime.fromtimestamp(create_time / 1000000000)
- if not ref.sql_column:
- log.warn("Reference %s has no column; skipping it", ref.name)
+ if delete_time:
+ delete_time = datetime.fromtimestamp(delete_time / 1000000000)
- continue
+ if obj._sync_time:
+ # This object is already in the database
- col = ref.sql_column
+ obj._qmf_update_time = update_time
+ columns.append(table._qmf_update_time)
- if value:
- that_id = str(QmfObjectId(value.first, value.second))
- that = self.get_object(ref.that_cls, that_id)
+ # XXX session_id may have changed too?
+ else:
+ obj._qmf_agent_id = self.agent.id
+ obj._qmf_object_id = self.object.getName()
+ obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_class_key = str(self.object.getClassKey())
+ obj._qmf_update_time = update_time
+ obj._qmf_create_time = create_time
- if not that._sync_time:
- continue
+ columns.append(table._id)
+ columns.append(table._qmf_agent_id)
+ columns.append(table._qmf_object_id)
+ columns.append(table._qmf_session_id)
+ columns.append(table._qmf_class_key)
+ columns.append(table._qmf_update_time)
+ columns.append(table._qmf_create_time)
- value = that._id
+ def process_properties(self, obj, columns):
+ cls = obj._class
- columns.append(col)
-
- setattr(obj, col.name, value)
-
- def process_properties(self, obj, columns):
for prop, value in self.object.getProperties():
- if prop.type == 10:
- continue
-
try:
- col = obj._class._properties_by_name[prop.name].sql_column
- except KeyError:
- log.debug("Property %s is unknown", prop)
-
+ if prop.type == 10:
+ col, nvalue = self.process_reference(cls, prop, value)
+ else:
+ col, nvalue = self.process_value(cls, prop, value)
+ except MappingException, e:
+ log.debug(e)
continue
- if value is not None:
- value = self.transform_value(prop, value)
-
# XXX This optimization will be obsolete when QMF does it
# instead
- if value == getattr(obj, col.name):
+ if nvalue == getattr(obj, col.name):
continue
+ setattr(obj, col.name, nvalue)
columns.append(col)
- setattr(obj, col.name, value)
+ def process_reference(self, cls, prop, value):
+ try:
+ ref = cls._references_by_name[prop.name]
+ except KeyError:
+ raise MappingException("Reference %s is unknown" % prop.name)
+ if not ref.sql_column:
+ raise MappingException("Reference %s has no column" % ref.name)
+
+ col = ref.sql_column
+
+ if value:
+ try:
+ that_id = str(value.objectName)
+ except:
+ raise MappingException("XXX ref isn't an oid")
+
+ that = self.get_object(ref.that_cls, that_id)
+
+ if not that._sync_time:
+ msg = "Referenced object %s hasn't appeared yet"
+ raise MappingException(msg % that)
+
+ value = that._id
+
+ return col, value
+
+ def process_value(self, cls, prop, value):
+ try:
+ col = cls._properties_by_name[prop.name].sql_column
+ except KeyError:
+ raise MappingException("Property %s is unknown" % prop)
+
+ if value is not None:
+ value = self.transform_value(prop, value)
+
+ return col, value
+
def transform_value(self, attr, value):
if attr.type == 8: # absTime
if value == 0:
@@ -274,13 +273,14 @@
def __repr__(self):
name = self.__class__.__name__
cls = self.object.getClassKey().getClassName()
+ id = self.object.getName()
- return "%s(%s,%s,%s)" % (name, self.agent.id, cls, self.object_id)
+ return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-class NewObjectDelete(NewObjectUpdate):
+class ObjectDelete(ObjectUpdate):
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
cursor = conn.cursor()
@@ -290,16 +290,16 @@
cursor.close()
try:
- del self.agent.objects_by_id[self.object_id]
+ del self.agent.objects_by_id[self.object.getName()]
except KeyError:
pass
stats.deleted += 1
-class NewSampleUpdate(NewObjectUpdate):
+class ObjectAddSample(ObjectUpdate):
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
if not cls._statistics:
stats.samples_dropped += 1; return
@@ -311,13 +311,17 @@
if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
stats.samples_dropped += 1; return
+ update_time, create_time, delete_time = self.object.getTimestamps()
+
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+
update_columns = list()
update_columns.append(cls.sql_table._qmf_update_time)
insert_columns = list()
insert_columns.append(cls.sql_samples_table._qmf_update_time)
- obj._qmf_update_time = self.update_time
+ obj._qmf_update_time = update_time
self.process_samples(obj, update_columns, insert_columns)
@@ -345,6 +349,8 @@
if value is not None:
value = self.transform_value(stat, value)
+ # Don't write unchanged values
+ #
# XXX This optimization will be obsolete when QMF does it
# instead
@@ -355,6 +361,28 @@
setattr(obj, col.name, value)
+class AgentDelete(Update):
+ def __init__(self, model, agent):
+ super(AgentDelete, self).__init__(model)
+
+ self.agent = agent
+
+ def do_process(self, conn, stats):
+ print "Ahoy!"
+
+ cursor = conn.cursor()
+
+ id = self.agent.id
+
+ try:
+ for pkg in self.model.rosemary._packages:
+ for cls in pkg._classes:
+ for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ obj.delete()
+ print "Bam!", obj
+ finally:
+ cursor.close()
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
@@ -370,3 +398,6 @@
class ObjectUnknown(UpdateException):
pass
+
+class MappingException(Exception):
+ pass
Modified: mgmt/newdata/mint/python/mint/tools.py
===================================================================
--- mgmt/newdata/mint/python/mint/tools.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/tools.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -72,9 +72,12 @@
if self.config.debug:
self.config.prt()
- enable_logging("rosemary", "debug", sys.stderr)
- enable_logging("mint", "debug", sys.stderr)
+ level = getattr(self.config, "log_level", "debug")
+
+ enable_logging("rosemary", level, sys.stderr)
+ enable_logging("mint", level, sys.stderr)
+
self.do_run(opts, args)
def do_run(self, opts, args):
@@ -87,7 +90,7 @@
class DatabaseSubcommand(Command):
def run(self, opts, args):
- conn = self.parent.database.get_connection()
+ conn = self.parent.app.database.get_connection()
cursor = conn.cursor()
try:
@@ -153,6 +156,8 @@
command.description = "Change password of USER"
command.arguments = ("USER",)
+ self.app = None
+
def run(self):
try:
opts, remaining = self.parse_options(sys.argv[1:])
@@ -171,16 +176,13 @@
self.config.prt()
enable_logging("mint", "debug", sys.stderr)
- app = Mint(self.config)
- app.update_enabled = False
- app.expire_enabled = False
+ self.app = Mint(self.config)
+ self.app.update_enabled = False
+ self.app.expire_enabled = False
- #self.cumin = app.model.rosemary.com_redhat_cumin
+ self.app.check()
+ self.app.init()
- self.database = MintDatabase(app)
- self.database.check()
- self.database.init()
-
try:
scommand = remaining[0]
except IndexError:
@@ -214,13 +216,13 @@
class LoadSchema(Command):
def run(self, opts, args):
- self.parent.database.load_schema()
+ self.parent.app.database.load_schema()
print "The schema is loaded"
class DropSchema(Command):
def run(self, opts, args):
if "force" in opts:
- self.parent.database.drop_schema()
+ self.parent.app.database.drop_schema()
print "The schema is dropped"
else:
raise CommandException \
@@ -229,8 +231,8 @@
class ReloadSchema(Command):
def run(self, opts, args):
if "force" in opts:
- self.parent.database.drop_schema()
- self.parent.database.load_schema()
+ self.parent.app.database.drop_schema()
+ self.parent.app.database.load_schema()
print "The schema is reloaded"
else:
raise CommandException \
@@ -238,7 +240,7 @@
class CheckSchema(Command):
def run(self, opts, args):
- self.parent.database.check_schema()
+ self.parent.app.database.check_schema()
class AddUser(DatabaseSubcommand):
def do_run(self, cursor, opts, args):
@@ -247,13 +249,6 @@
except IndexError:
raise CommandException(self, "NAME is required")
- #objs = self.parent.cumin.User.get_selection(cursor, name=name)
- #if objs:
-
- if Subject.selectBy(name=name).count():
- print "Error: a user called '%s' already exists" % name
- sys.exit(1)
-
try:
password = args[2]
except IndexError:
@@ -261,62 +256,61 @@
crypted = crypt_password(password)
- try:
- subject = Subject(name=name, password=crypted)
+ pkg = self.parent.app.model.rosemary.com_redhat_cumin
- for role in Role.selectBy(name="user"):
- subject.addRole(role)
+ for role in pkg.Role.get_selection(cursor, name="user"):
+ break
- assert role
+ assert role, self
- subject.syncUpdate()
+ user = pkg.User.create_object(cursor)
+ user.name = name
+ user.password = crypted
- # user = self.parent.cumin.User.create_object(cursor)
- # user.name = name
- # user.password = crypted
- # user.save(cursor)
-
- # roles = self.parent.cumin.Role.get_selection \
- # (cursor, name="user")
-
- # for role in roles:
- # mapping = self.parent.cumin.UserRoleMapping.create_object \
- # (cursor)
- # mapping.user = user
- # mapping.role = role
- # mapping.save(cursor)
-
- # break
-
- assert role, self
+ try:
+ user.save(cursor)
except IntegrityError:
print "Error: a user called '%s' already exists" % name
sys.exit(1)
+ mapping = pkg.UserRoleMapping.create_object(cursor)
+ mapping._role_id = role._id
+ mapping._user_id = user._id
+ mapping.save(cursor)
+
+ conn.commit()
+
+ assert role, self
+
print "User '%s' is added" % name
- class RemoveUser(Command):
- def run(self, opts, args):
- if "force" in opts:
- if len(args) != 2:
- print "Error: no user name given"
- sys.exit(1)
+ class RemoveUser(DatabaseSubcommand):
+ def do_run(self, cursor, opts, args):
+ if "force" not in opts:
+ msg = "Command remove-user requires --force"
+ raise CommandException(self, msg)
+ try:
name = args[1]
- subjects = Subject.selectBy(name=name)
+ except IndexError:
+ raise CommandException(self, "NAME is required")
- if subjects.count():
- for subject in subjects:
- subject.destroySelf()
- break
- else:
- raise CommandException(self, "User '%s' is unknown" % name)
+ name = args[1]
- print "User '%s' is removed" % name
- else:
- raise CommandException \
- (self, "Command remove-user requires --force yes")
+ cls = self.app.model.rosemary.com_redhat_cumin.User
+ for user in cls.get_selection(cursor, name=name):
+ break
+
+ if not user:
+ raise CommandException(self, "User '%s' is unknown" % name)
+
+ user.delete(cursor)
+
+ conn.commit()
+
+ print "User '%s' is removed" % name
+
class ListUsers(Command):
def run(self, opts, args):
subjects = Subject.select(orderBy='name')
@@ -457,8 +451,34 @@
for arg in args[1:]:
app.model.add_broker(arg)
+ sleep(2)
+
+ cls = app.model.rosemary.org_apache_qpid_broker.Broker
+
+ conn = app.database.get_connection()
+ cursor = conn.cursor()
+
+ for obj in cls.get_selection(cursor):
+ try:
+ agent = app.model.agents[obj._qmf_agent_id]
+ except KeyError:
+ continue
+
+ break
+
+ print "TTT", obj.port, obj, agent
+
+ def completion(status_code, output_args):
+ print "YYY", status_code, output_args
+
+ agent.call_method(completion, obj, "echo", 1, "ggoo!")
+
while True:
sleep(2)
+ except Exception, e:
+ print_exc()
+
+ print e
finally:
app.stop()
@@ -492,6 +512,17 @@
print "Warning: Failed connecting to broker at '%s'" % arg
try:
+ enq = 0
+ deq = 0
+
+ upd = 0
+ dlt = 0
+ drp = 0
+
+ supd = 0
+ sexp = 0
+ sdrp = 0
+
enq_last = 0
deq_last = 0
@@ -513,7 +544,7 @@
sleep(1)
- stats = app.new_update_thread.stats
+ stats = app.update_thread.stats
enq = stats.enqueued
deq = stats.dequeued
@@ -522,9 +553,9 @@
dlt = stats.deleted
drp = stats.dropped
- supd = stats.stats_updated
- sexp = stats.stats_expired
- sdrp = stats.stats_dropped
+ supd = stats.samples_updated
+ sexp = stats.samples_expired
+ sdrp = stats.samples_dropped
print row % (enq - enq_last,
deq - deq_last,
@@ -561,5 +592,6 @@
finally:
#from threading import enumerate
#for item in enumerate():
+ # print item
app.stop()
Deleted: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/update.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,367 +0,0 @@
-import pickle
-import mint
-
-from collections import deque
-from qpid.datatypes import UUID
-
-from schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.update")
-
-class UpdateThread(MintDaemonThread):
- """
- Only the update thread writes to the database
- """
-
- def __init__(self, app):
- super(UpdateThread, self).__init__(app)
-
- self.conn = None
- self.stats = None
-
- self.updates = UpdateQueue(slotCount=2)
-
- def init(self):
- self.conn = self.app.database.get_connection()
- self.stats = UpdateStats()
-
- def enqueue(self, update):
- update.thread = self
-
- self.updates.put(update)
-
- self.stats.enqueued += 1
-
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
-
- if self.updates.qsize() > 1000:
- sleep(0.1)
-
- def run(self):
- while True:
- if self.stop_requested:
- break
-
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
-
- self.stats.dequeued += 1
-
- update.process()
-
-class UpdateStats(object):
- def __init__(self):
- self.enqueued = 0
- self.dequeued = 0
- self.stat_updated = 0
- self.prop_updated = 0
- self.expired = 0
- self.dropped = 0
- self.deferred = 0
-
-class ReferenceException(Exception):
- def __init__(self, sought):
- self.sought = sought
-
- def __str__(self):
- return repr(self.sought)
-
-class Update(object):
- def __init__(self):
- self.thread = None
- self.priority = 0
-
- def process(self):
- log.debug("Processing %s", self)
-
- assert self.thread
-
- conn = self.thread.conn
- stats = self.thread.stats
-
- try:
- self.do_process(conn, stats)
-
- for notice in conn.notices:
- log.debug("Database: %s", notice)
-
- conn.commit()
- except:
- log.exception("Update failed")
-
- conn.rollback()
-
- def do_process(self, conn, stats):
- raise Exception("Not implemented")
-
- def __repr__(self):
- return "%s(%i)" % (self.__class__.__name__, self.priority)
-
-class ObjectUpdate(Update):
- def __init__(self, agent, object):
- super(ObjectUpdate, self).__init__()
-
- self.agent = agent
- self.object = object
-
- self.object_id = str(QmfObjectId.fromObject(object))
-
- def getClass(self):
- # XXX this is unfortunate
- name = self.object.getClassKey().getClassName()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
- name = name[0].upper() + name[1:]
-
- try:
- return schemaNameToClassMap[name]
- except KeyError:
- raise ReferenceException(name)
-
- def process_attributes(self, attrs, cls):
- results = dict()
-
- for key, value in attrs:
- name = key.__repr__()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
-
- if key.type == 10:
- # XXX don't want oid around much
- self.process_reference(name, value, results)
- continue
-
- if not hasattr(cls, name):
- # Discard attrs that we don't have in our schema
- log.debug("%s has no field '%s'" % (cls, name))
- continue
-
- if key.type == 8:
- self.process_timestamp(name, value, results)
- continue
-
- if key.type == 14:
- # Convert UUIDs into their string representation, to be
- # handled by sqlobject
- results[name] = str(value)
- continue
-
- if key.type == 15:
- #if value:
- results[name] = pickle.dumps(value)
- continue
-
- results[name] = value
-
- return results
-
- # XXX this needs to be a much more straightforward procedure
- def process_reference(self, name, oid, results):
- if name.endswith("Ref"):
- name = name[:-3]
-
- className = name[0].upper() + name[1:]
-
- try:
- otherClass = getattr(mint, className)
- except AttributeError:
- return
-
- foreignKey = name + "_id"
-
- object_id = str(QmfObjectId(oid.first, oid.second))
- id = self.agent.database_ids.get(object_id)
-
- if id is None:
- # XXX don't want oid around much
- raise ReferenceException(oid)
-
- results[foreignKey] = id
-
- def process_timestamp(self, name, tstamp, results):
- if tstamp:
- t = datetime.fromtimestamp(tstamp / 1000000000)
- results[name] = t
-
- def __repr__(self):
- cls = self.object.getClassKey().getClassName()
-
- return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
- self.agent.id,
- cls,
- self.object_id,
- self.priority)
-
-class PropertyUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
-
- stats.dropped += 1
-
- return
-
- try:
- attrs = self.process_attributes(self.object.getProperties(), cls)
- except ReferenceException, e:
- log.info("Referenced object %r not found", e.sought)
-
- self.agent.deferred_updates[self.object_id].append(self)
-
- stats.deferred += 1
-
- return
-
- update, create, delete = self.object.getTimestamps()
-
- self.process_timestamp("qmfUpdateTime", update, attrs)
- self.process_timestamp("qmfCreateTime", create, attrs)
-
- if delete != 0:
- self.process_timestamp("qmfDeleteTime", delete, attrs)
-
- log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, self.object_id)
-
- attrs["qmfAgentId"] = self.agent.id
- attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(self.object_id)
- attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
-
- cursor = conn.cursor()
-
- # Cases:
- #
- # 1. Object is utterly new to mint
- # 2. Object is in mint's db, but id is not yet cached
- # 3. Object is in mint's db, and id is cached
-
- id = self.agent.database_ids.get(self.object_id)
-
- if id is None:
- # Case 1 or 2
-
- op = SqlGetId(cls)
- op.execute(cursor, attrs)
-
- rec = cursor.fetchone()
-
- if rec:
- id = rec[0]
-
- if id is None:
- # Case 1
-
- op = SqlInsert(cls, attrs)
- op.execute(cursor, attrs)
-
- id = cursor.fetchone()[0]
-
- log.debug("%s(%i) created", cls.__name__, id)
- else:
- # Case 2
-
- attrs["id"] = id
-
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
-
- assert cursor.rowcount == 1
-
- self.agent.database_ids.set(self.object_id, id)
- else:
- # Case 3
-
- attrs["id"] = id
-
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
-
- #assert cursor.rowcount == 1
-
- try:
- updates = self.agent.deferred_updates.pop(self.object_id)
-
- if updates:
- log.info("Reenqueueing %i deferred updates", len(updates))
-
- for update in updates:
- self.thread.enqueue(update)
- except KeyError:
- pass
-
- self.agent.database_ids.commit()
-
- stats.prop_updated += 1
-
-class StatisticUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
- return
-
- statsCls = getattr(mint, "%sStats" % cls.__name__)
-
- id = self.agent.database_ids.get(self.object_id)
-
- if id is None:
- stats.dropped += 1
- return
-
- timestamps = self.object.getTimestamps()
-
- tnow = datetime.now()
- t = datetime.fromtimestamp(timestamps[0] / 1000000000)
-
- if t < tnow - timedelta(seconds=30):
- seconds = (tnow - t).seconds
- log.debug("Update is %i seconds old; skipping it", seconds)
-
- stats.dropped += 1
-
- return
-
- try:
- attrs = self.process_attributes \
- (self.object.getStatistics(), statsCls)
- except ReferenceException:
- stats.dropped += 1
-
- return
-
- # XXX do we still want this
- attrs["qmfUpdateTime"] = t > tnow and tnow or t
- attrs["%s_id" % cls.sqlmeta.table] = id
-
- cursor = conn.cursor()
-
- op = SqlInsert(statsCls, attrs)
- op.execute(cursor, attrs)
-
- log.debug("%s(%s) created", statsCls.__name__, id)
-
- stats.stat_updated += 1
-
-class AgentDisconnectUpdate(Update):
- def __init__(self, agent):
- super(AgentDisconnectUpdate, self).__init__()
-
- self.agent = agent
-
- def do_process(self, conn, stats):
- cursor = conn.cursor()
-
- args = dict()
-
- if self.agent:
- args["qmf_agent_id"] = self.agent.id
-
- op = SqlAgentDisconnect(self.agent)
- op.execute(cursor, args)
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/util.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -69,12 +69,16 @@
return cls(brokerId, brokerBank, agentBank)
def fromAgent(cls, agent):
+ return agent.getAgentBank()
+
broker = agent.getBroker()
brokerId = broker.getBrokerId()
brokerBank = broker.getBrokerBank()
agentBank = agent.getAgentBank()
+ print "XXX", brokerId, brokerBank, agentBank
+
return cls(brokerId, brokerBank, agentBank)
def fromString(cls, string):
@@ -90,16 +94,20 @@
fromString = classmethod(fromString)
def __str__(self):
- return "%s.%i.%i" % (self.brokerId, self.brokerBank, self.agentBank)
+ return self.agentBank
class QmfObjectId(object):
- def __init__(self, first, second):
- self.first = first
- self.second = second
+ def __init__(self, id):
+ self.id = id
def fromObject(cls, object):
oid = object.getObjectId()
+ print "XXX", oid
+
+ for k, v in oid.__dict__.items():
+ print " ", k, v
+
return cls(oid.first, oid.second)
def fromString(cls, string):
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,5 +1,4 @@
from newupdate import *
-from update import *
from util import *
log = logging.getLogger("mint.vacuum")
@@ -10,30 +9,13 @@
if self.stop_requested:
break
- up = VacuumUpdate()
+ up = VacuumUpdate(self.app.model)
self.app.update_thread.enqueue(up)
- up = NewVacuumUpdate(self.app.model)
- self.app.new_update_thread.enqueue(up)
-
sleep(60 * 60 * 10)
class VacuumUpdate(Update):
def do_process(self, conn, stats):
- log.info("Vacuuming")
-
- conn.commit()
-
- level = conn.isolation_level
- conn.set_isolation_level(0)
-
- cursor = conn.cursor()
- cursor.execute("vacuum")
-
- conn.set_isolation_level(level)
-
-class NewVacuumUpdate(NewUpdate):
- def do_process(self, conn, stats):
log.info("Vacumming tables")
level = conn.isolation_level
@@ -57,4 +39,3 @@
log.debug("Database: %s", notice.replace("\n", " "))
finally:
cursor.close()
-
Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/rosemary/python/rosemary/model.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -410,6 +410,9 @@
self.sql_delete.execute(cursor, (), obj.__dict__)
+ def delete_selection(self, cursor, **kwargs):
+ pass # XXX
+
def __repr__(self):
args = (self.__class__.__name__, self._package._name, self._name)
return "%s(%s,%s)" % args
14 years, 8 months