Author: tedross
Date: 2011-05-10 11:35:17 -0400 (Tue, 10 May 2011)
New Revision: 4458
Added:
store/trunk/cpp/tools/qpidstore/
store/trunk/cpp/tools/qpidstore/__init__.py
store/trunk/cpp/tools/qpidstore/janal.py
store/trunk/cpp/tools/qpidstore/jerr.py
store/trunk/cpp/tools/qpidstore/jrnl.py
Removed:
store/trunk/cpp/tools/__init__.py
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/jerr.py
store/trunk/cpp/tools/jrnl.py
Modified:
store/trunk/cpp/tools/Makefile.am
Log:
Fix the module structure of the python tools so it can be properly invoked from the
tests.
Modified: store/trunk/cpp/tools/Makefile.am
===================================================================
--- store/trunk/cpp/tools/Makefile.am 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/Makefile.am 2011-05-10 15:35:17 UTC (rev 4458)
@@ -23,4 +23,8 @@
qpidexec_SCRIPTS = resize store_chk
pkgpyexec_qpiddir = $(pyexecdir)/qpidstore
-pkgpyexec_qpid_PYTHON = __init__.py jerr.py jrnl.py janal.py
+pkgpyexec_qpid_PYTHON = \
+ qpidstore/__init__.py \
+ qpidstore/jerr.py \
+ qpidstore/jrnl.py \
+ qpidstore/janal.py
Deleted: store/trunk/cpp/tools/__init__.py
===================================================================
--- store/trunk/cpp/tools/__init__.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/__init__.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,23 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
Deleted: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/janal.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,612 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-import jerr, jrnl
-import os.path, sys
-
-
-#== class EnqMap ==============================================================
-
-class EnqMap(object):
- """Class for maintaining a map of enqueued records, indexing the rid
against hdr, fid and transaction lock"""
-
- def __init__(self):
- """Constructor"""
- self.__map = {}
-
- def __str__(self):
- """Print the contents of the map"""
- return self.report(True, True)
-
- def add(self, fid, hdr, lock = False):
- """Add a new record into the map"""
- if hdr.rid in self.__map:
- raise jerr.DuplicateRidError(hdr.rid)
- self.__map[hdr.rid] = [fid, hdr, lock]
-
- def contains(self, rid):
- """Return True if the map contains the given
rid"""
- return rid in self.__map
-
- def delete(self, rid):
- """Delete the rid and its associated data from the
map"""
- if rid in self.__map:
- if self.get_lock(rid):
- raise jerr.DeleteLockedRecordError(rid)
- del self.__map[rid]
- else:
- raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap:
rid=0x%x" % rid)
-
- def get(self, rid):
- """Return a list [fid, hdr, lock] for the given
rid"""
- if self.contains(rid):
- return self.__map[rid]
- return None
-
- def get_fid(self, rid):
- """Return the fid for the given rid"""
- if self.contains(rid):
- return self.__map[rid][0]
- return None
-
- def get_hdr(self, rid):
- """Return the header record for the given rid"""
- if self.contains(rid):
- return self.__map[rid][1]
- return None
-
- def get_lock(self, rid):
- """Return the transaction lock value for the given
rid"""
- if self.contains(rid):
- return self.__map[rid][2]
- return None
-
- def get_rec_list(self):
- """Return a list of tuples (fid, hdr, lock) for all entries in the
map"""
- return self.__map.values()
-
- def lock(self, rid):
- """Set the transaction lock for a given rid to
True"""
- if rid in self.__map:
- if not self.__map[rid][2]: # locked
- self.__map[rid][2] = True
- else:
- raise jerr.AlreadyLockedError(rid)
- else:
- raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap:
rid=0x%x" % rid)
-
- def report(self, show_stats, show_records):
- """Return a string containing a text report for all records in the
map"""
- if len(self.__map) == 0:
- return "No enqueued records found."
- rstr = "%d enqueued records found" % len(self.__map)
- if show_records:
- rstr += ":"
- rid_list = self.__map.keys()
- rid_list.sort()
- for rid in rid_list:
- if self.__map[rid][2]:
- lock_str = " [LOCKED]"
- else:
- lock_str = ""
- rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
- else:
- rstr += "."
- return rstr
-
- def rids(self):
- """Return a list of rids in the map"""
- return self.__map.keys()
-
- def size(self):
- """Return the number of entries in the map"""
- return len(self.__map)
-
- def unlock(self, rid):
- """Set the transaction lock for a given rid to
False"""
- if rid in self.__map:
- if self.__map[rid][2]:
- self.__map[rid][2] = False
- else:
- raise jerr.NotLockedError(rid)
- else:
- raise jerr.NonExistentRecordError("unlock", rid)
-
-
-#== class TxnMap ==============================================================
-
-class TxnMap(object):
- """Transaction map, which maps xids to a list of outstanding
actions"""
-
- def __init__(self, emap):
- """Constructor, requires an existing EnqMap
instance"""
- self.__emap = emap
- self.__map = {}
-
- def __str__(self):
- """Print the contents of the map"""
- return self.report(True, True)
-
- def add(self, fid, hdr):
- """Add a new transactional record into the map"""
- if isinstance(hdr, jrnl.DeqRec):
- try:
- self.__emap.lock(hdr.deq_rid)
- except jerr.JWarning:
- # Not in emap, look for rid in tmap
- l = self.find_rid(hdr.deq_rid, hdr.xid)
- if l != None:
- if l[2]:
- raise jerr.AlreadyLockedError(hdr.deq_rid)
- l[2] = True
- if hdr.xid in self.__map:
- self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
- else:
- self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
-
- def contains(self, xid):
- """Return True if the xid exists in the map; False
otherwise"""
- return xid in self.__map
-
- def delete(self, hdr):
- """Remove a transaction record from the map using either a commit
or abort header"""
- if hdr.magic[-1] == "c":
- return self._commit(hdr.xid)
- if hdr.magic[-1] == "a":
- self._abort(hdr.xid)
- else:
- raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic,
hdr.rid)
-
- def find_rid(self, rid, xid_hint = None):
- """ Search for and return map list with supplied rid. If xid_hint
is supplied, try that xid first"""
- if xid_hint != None and self.contains(xid_hint):
- for l in self.__map[xid_hint]:
- if l[1].rid == rid:
- return l
- for xid in self.__map.iterkeys():
- if xid_hint == None or xid != xid_hint:
- for l in self.__map[xid]:
- if l[1].rid == rid:
- return l
-
- def get(self, xid):
- """Return a list of operations for the given
xid"""
- if self.contains(xid):
- return self.__map[xid]
-
- def report(self, show_stats, show_records):
- """Return a string containing a text report for all records in the
map"""
- if len(self.__map) == 0:
- return "No outstanding transactions found."
- rstr = "%d outstanding transactions found" % len(self.__map)
- if show_records:
- rstr += ":"
- for xid, tup in self.__map.iteritems():
- rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
- for i in tup:
- rstr += "\n %s" % str(i[1])
- else:
- rstr += "."
- return rstr
-
- def size(self):
- """Return the number of xids in the map"""
- return len(self.__map)
-
- def xids(self):
- """Return a list of xids in the map"""
- return self.__map.keys()
-
- def _abort(self, xid):
- """Perform an abort operation for the given xid
record"""
- for fid, hdr in self.__map[xid]:
- if isinstance(hdr, jrnl.DeqRec):
- self.__emap.unlock(hdr.rid)
- del self.__map[xid]
-
- def _commit(self, xid):
- """Perform a commit operation for the given xid
record"""
- mismatch_list = []
- for fid, hdr, lock in self.__map[xid]:
- if isinstance(hdr, jrnl.EnqRec):
- self.__emap.add(fid, hdr, lock) # Transfer enq to emap
- else:
- if self.__emap.contains(hdr.deq_rid):
- self.__emap.unlock(hdr.deq_rid)
- self.__emap.delete(hdr.deq_rid)
- else:
- mismatch_list.append("0x%x" % hdr.deq_rid)
- del self.__map[xid]
- return mismatch_list
-
-#== class JrnlAnalyzer ========================================================
-
-class JrnlAnalyzer(object):
- """
- This class analyzes a set of journal files and determines which is the last to be
written
- (the newest file), and hence which should be the first to be read for recovery (the
oldest
- file).
-
- The analysis is performed on construction; the contents of the JrnlInfo object passed
provide
- the recovery details.
- """
-
- def __init__(self, jinf):
- """Constructor"""
- self.__oldest = None
- self.__jinf = jinf
- self.__flist = self._analyze()
-
- def __str__(self):
- """String representation of this JrnlAnalyzer instance, will print
out results of analysis."""
- ostr = "Journal files analyzed in directory %s (* = earliest full):\n"
% self.__jinf.get_current_dir()
- if self.is_empty():
- ostr += " <All journal files are empty>\n"
- else:
- for tup in self.__flist:
- tmp = " "
- if tup[0] == self.__oldest[0]:
- tmp = "*"
- ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp,
os.path.basename(tup[1]), tup[2],
- tup[3], tup[4],
tup[5])
- for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
- ostr += " %s.%04x.jdat: <empty>\n" %
(self.__jinf.get_jrnl_base_name(), i)
- return ostr
-
- # Analysis
-
- def get_oldest_file(self):
- """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the
oldest data file found in the journal"""
- return self.__oldest
-
- def get_oldest_file_index(self):
- """Return the ordinal number of the oldest data file found in the
journal"""
- if self.is_empty():
- return None
- return self.__oldest[0]
-
- def is_empty(self):
- """Return true if the analysis found that the journal file has
never been written to"""
- return len(self.__flist) == 0
-
- def _analyze(self):
- """Perform the journal file analysis by reading and comparing the
file headers of each journal data file"""
- owi_found = False
- flist = []
- for i in range(0, self.__jinf.get_num_jrnl_files()):
- jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" %
(self.__jinf.get_jrnl_base_name(), i))
- fhandle = open(jfn)
- fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
- if fhdr.empty():
- break
- this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
- flist.append(this_tup)
- if i == 0:
- init_owi = fhdr.owi()
- self.__oldest = this_tup
- elif fhdr.owi() != init_owi and not owi_found:
- self.__oldest = this_tup
- owi_found = True
- return flist
-
-
-#== class JrnlReader ====================================================
-
-class JrnlReader(object):
- """
- This class contains an Enqueue Map (emap), a transaction map (tmap) and a
transaction
- object list (txn_obj_list) which are populated by reading the journals from the
oldest
- to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
- objects supplied on construction provide the information used for the recovery.
-
- The analysis is performed on construction.
- """
-
- def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
- """Constructor, which reads all """
- self._jinfo = jinfo
- self._jra = jra
- self._qflag = qflag
- self._rflag = rflag
- self._vflag = vflag
-
- # test callback functions for CSV tests
- self._csv_store_chk = None
- self._csv_start_cb = None
- self._csv_enq_cb = None
- self._csv_deq_cb = None
- self._csv_txn_cb = None
- self._csv_end_cb = None
-
- self._emap = EnqMap()
- self._tmap = TxnMap(self._emap)
- self._txn_obj_list = {}
-
- self._file = None
- self._file_hdr = None
- self._file_num = None
- self._first_rec_flag = None
- self._fro = None
- self._last_file_flag = None
- self._start_file_num = None
- self._file_hdr_owi = None
- self._warning = []
-
- self._abort_cnt = 0
- self._commit_cnt = 0
- self._msg_cnt = 0
- self._rec_cnt = 0
- self._txn_msg_cnt = 0
-
- def __str__(self):
- """Print out all the undequeued records"""
- return self.report(True, self._rflag)
-
- def emap(self):
- """Get the enqueue map"""
- return self._emap
-
- def get_abort_cnt(self):
- """Get the cumulative number of transactional aborts
found"""
- return self._abort_cnt
-
- def get_commit_cnt(self):
- """Get the cumulative number of transactional commits
found"""
- return self._commit_cnt
-
- def get_msg_cnt(self):
- """Get the cumulative number of messages found"""
- return self._msg_cnt
-
- def get_rec_cnt(self):
- """Get the cumulative number of journal records (including
fillers) found"""
- return self._rec_cnt
-
- def is_last_file(self):
- """Return True if the last file is being read"""
- return self._last_file_flag
-
- def report(self, show_stats = True, show_records = False):
- """Return a string containing a report on the file
analysis"""
- rstr = self._emap.report(show_stats, show_records) + "\n" +
self._tmap.report(show_stats, show_records)
- #TODO - print size analysis here - ie how full, sparse, est. space remaining
before enq threshold
- return rstr
-
- def run(self):
- """Perform the read of the journal"""
- if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
- return
- if self._jra.is_empty():
- return
- stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
- while not stop and not self._get_next_record():
- pass
- if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
- return
- if not self._qflag:
- print
-
- def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None,
csv_deq_cb = None, csv_txn_cb = None,
- csv_end_cb = None):
- """Set callbacks for checks to be made at various points while
reading the journal"""
- self._csv_store_chk = csv_store_chk
- self._csv_start_cb = csv_start_cb
- self._csv_enq_cb = csv_enq_cb
- self._csv_deq_cb = csv_deq_cb
- self._csv_txn_cb = csv_txn_cb
- self._csv_end_cb = csv_end_cb
-
- def tmap(self):
- """Return the transaction map"""
- return self._tmap
-
- def get_txn_msg_cnt(self):
- """Get the cumulative transactional message
count"""
- return self._txn_msg_cnt
-
- def txn_obj_list(self):
- """Get a cumulative list of transaction objects (commits and
aborts)"""
- return self._txn_obj_list
-
- def _advance_jrnl_file(self, *oldest_file_info):
- """Rotate to using the next journal file. Return False if the
operation was successful, True if there are no
- more files to read."""
- fro_seek_flag = False
- if len(oldest_file_info) > 0:
- self._start_file_num = self._file_num = oldest_file_info[0]
- self._fro = oldest_file_info[4]
- fro_seek_flag = True # jump to fro to start reading
- if not self._qflag and not self._rflag:
- if self._vflag:
- print "Recovering journals..."
- else:
- print "Recovering journals",
- if self._file != None and self._is_file_full():
- self._file.close()
- self._file_num = self._incr_file_num()
- if self._file_num == self._start_file_num:
- return True
- if self._start_file_num == 0:
- self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files()
- 1
- else:
- self._last_file_flag = self._file_num == self._start_file_num - 1
- if self._file_num < 0 or self._file_num >=
self._jinfo.get_num_jrnl_files():
- raise jerr.BadFileNumberError(self._file_num)
- jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
- (self._jinfo.get_jrnl_base_name(), self._file_num))
- self._file = open(jfn)
- self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
- if fro_seek_flag and self._file.tell() != self._fro:
- self._file.seek(self._fro)
- self._first_rec_flag = True
- if not self._qflag:
- if self._rflag:
- print jfn, ": ", self._file_hdr
- elif self._vflag:
- print "* Reading %s" % jfn
- else:
- print ".",
- sys.stdout.flush()
- return False
-
- def _check_owi(self, hdr):
- """Return True if the header's owi indicator matches that of
the file header record; False otherwise. This can
- indicate whether the last record in a file has been read and now older records
which have not yet been
- overwritten are now being read."""
- return self._file_hdr_owi == hdr.owi()
-
- def _is_file_full(self):
- """Return True if the current file is full (no more write space);
false otherwise"""
- return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
-
- def _get_next_record(self):
- """Get the next record in the file for analysis"""
- if self._is_file_full():
- if self._advance_jrnl_file():
- return True
- try:
- hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
- except:
- return True
- if hdr.empty():
- return True
- if hdr.check():
- return True
- self._rec_cnt += 1
- self._file_hdr_owi = self._file_hdr.owi()
- if self._first_rec_flag:
- if self._file_hdr.fro != hdr.foffs:
- raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
- else:
- if self._rflag:
- print " * fro ok: 0x%x" % self._file_hdr.fro
- self._first_rec_flag = False
- stop = False
- if isinstance(hdr, jrnl.EnqRec):
- stop = self._handle_enq_rec(hdr)
- elif isinstance(hdr, jrnl.DeqRec):
- stop = self._handle_deq_rec(hdr)
- elif isinstance(hdr, jrnl.TxnRec):
- stop = self._handle_txn_rec(hdr)
- wstr = ""
- for warn in self._warning:
- wstr += " (%s)" % warn
- if self._rflag:
- print " > %s %s" % (hdr, wstr)
- self._warning = []
- return stop
-
- def _handle_deq_rec(self, hdr):
- """Process a dequeue ("RHMd") record"""
- if self._load_rec(hdr):
- return True
-
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
- return True
- # Test hook
- if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
- return True
-
- try:
- if hdr.xid == None:
- self._emap.delete(hdr.deq_rid)
- else:
- self._tmap.add(self._file_hdr.fid, hdr)
- except jerr.JWarning, warn:
- self._warning.append(str(warn))
- return False
-
- def _handle_enq_rec(self, hdr):
- """Process a dequeue ("RHMe") record"""
- if self._load_rec(hdr):
- return True
-
- # Check extern flag
- if hdr.extern and hdr.data != None:
- raise jerr.ExternFlagDataError(hdr)
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
- return True
- # Test hook
- if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
- return True
-
- if hdr.xid == None:
- self._emap.add(self._file_hdr.fid, hdr)
- else:
- self._txn_msg_cnt += 1
- self._tmap.add(self._file_hdr.fid, hdr)
- self._msg_cnt += 1
- return False
-
- def _handle_txn_rec(self, hdr):
- """Process a transaction ("RHMa or RHMc")
record"""
- if self._load_rec(hdr):
- return True
-
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
- return True
- # Test hook
- if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
- return True
-
- if hdr.magic[-1] == "a":
- self._abort_cnt += 1
- else:
- self._commit_cnt += 1
-
- if self._tmap.contains(hdr.xid):
- mismatched_rids = self._tmap.delete(hdr)
- if mismatched_rids != None and len(mismatched_rids) > 0:
- self._warning.append("WARNING: transactional dequeues not found in
enqueue map; rids=%s" %
- mismatched_rids)
- else:
- self._warning.append("WARNING: %s not found in transaction map" %
jrnl.Utils.format_xid(hdr.xid))
- if hdr.magic[-1] == "c": # commits only
- self._txn_obj_list[hdr.xid] = hdr
- return False
-
- def _incr_file_num(self):
- """Increment the number of files read with wraparound (ie after
file n-1, go to 0)"""
- self._file_num += 1
- if self._file_num >= self._jinfo.get_num_jrnl_files():
- self._file_num = 0
- return self._file_num
-
- def _load_rec(self, hdr):
- """Load a single record for the given header. There may be
arbitrarily large xids and data components."""
- while not hdr.complete():
- if self._advance_jrnl_file():
- return True
- hdr.load(self._file)
- return False
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
Deleted: store/trunk/cpp/tools/jerr.py
===================================================================
--- store/trunk/cpp/tools/jerr.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/jerr.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,223 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-# == Warnings =================================================================
-
-class JWarning(Exception):
- """Class to convey a warning"""
- def __init__(self, err):
- """Constructor"""
- Exception.__init__(self, err)
-
-# == Errors ===================================================================
-
-class AllJrnlFilesEmptyCsvError(Exception):
- """All journal files are empty (never been written)"""
- def __init__(self, tnum, exp_num_msgs):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] All journal files are empty, but test
expects %d msg(s)." %
- (tnum, exp_num_msgs))
-
-class AlreadyLockedError(Exception):
- """Error class for trying to lock a record that is already
locked"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Locking record which is already locked in EnqMap:
rid=0x%x" % rid)
-
-class BadFileNumberError(Exception):
- """Error class for incorrect or unexpected file
number"""
- def __init__(self, file_num):
- """Constructor"""
- Exception.__init__(self, "Bad file number %d" % file_num)
-
-class DataSizeError(Exception):
- """Error class for data size mismatch"""
- def __init__(self, exp_size, act_size, data_str):
- """Constructor"""
- Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d;
data=\"%s\"" %
- (exp_size, act_size, data_str))
-
-class DeleteLockedRecordError(Exception):
- """Error class for deleting a locked record from the enqueue
map"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s"
% rid)
-
-class DequeueNonExistentEnqueueError(Exception):
- """Error class for attempting to dequeue a non-existent enqueue record
(rid)"""
- def __init__(self, deq_rid):
- """Constructor"""
- Exception.__init__(self, "Dequeuing non-existent enqueue record:
rid=0x%s" % deq_rid)
-
-class DuplicateRidError(Exception):
- """Error class for placing duplicate rid into enqueue
map"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x"
% rid)
-
-class EndianMismatchError(Exception):
- """Error class mismatched record header endian flag"""
- def __init__(self, exp_endianness):
- """Constructor"""
- Exception.__init__(self, "Endian mismatch: expected %s, but current record
is %s" %
- self.endian_str(exp_endianness))
- #@staticmethod
- def endian_str(endianness):
- """Return a string tuple for the endianness error
message"""
- if endianness:
- return "big", "little"
- return "little", "big"
- endian_str = staticmethod(endian_str)
-
-class ExternFlagDataError(Exception):
- """Error class for the extern flag being set and the internal size
> 0"""
- def __init__(self, hdr):
- """Constructor"""
- Exception.__init__(self, "Message data found (msg size > 0) on record
with external flag set: hdr=%s" % hdr)
-
-class ExternFlagCsvError(Exception):
- """External flag mismatch between record and CSV test
file"""
- def __init__(self, tnum, exp_extern_flag):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s"
% (tnum, exp_extern_flag))
-
-class ExternFlagWithDataCsvError(Exception):
- """External flag set and Message data found"""
- def __init__(self, tnum):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message data found on record with
external flag set" % tnum)
-
-class FillExceedsFileSizeError(Exception):
- """Internal error from a fill operation which will exceed the
specified file size"""
- def __init__(self, cur_size, file_size):
- """Constructor"""
- Exception.__init__(self, "Filling to size %d > max file size %d" %
(cur_size, file_size))
-
-class FillSizeError(Exception):
- """Internal error from a fill operation that did not match the
calculated end point in the file"""
- def __init__(self, cur_posn, exp_posn):
- """Constructor"""
- Exception.__init__(self, "Filled to size %d > expected file posn %d"
% (cur_posn, exp_posn))
-
-class FirstRecordOffsetMismatch(Exception):
- """Error class for file header fro mismatch with actual
record"""
- def __init__(self, fro, actual_offs):
- """Constructor"""
- Exception.__init__(self, "File header first record offset mismatch:
fro=0x%x; actual offs=0x%x" %
- (fro, actual_offs))
-
-class InvalidHeaderVersionError(Exception):
- """Error class for invalid record header version"""
- def __init__(self, exp_ver, act_ver):
- """Constructor"""
- Exception.__init__(self, "Invalid header version: expected:%d,
actual:%d." % (exp_ver, act_ver))
-
-class InvalidRecordTypeError(Exception):
- """Error class for any operation using an invalid record
type"""
- def __init__(self, operation, magic, rid):
- """Constructor"""
- Exception.__init__(self, "Invalid record type for operation: operation=%s
record magic=%s, rid=0x%x" %
- (operation, magic, rid))
-
-class InvalidRecordTailError(Exception):
- """Error class for invalid record tail"""
- def __init__(self, magic_err, rid_err, rec):
- """Constructor"""
- Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec,
self.tail_err_str(magic_err, rid_err)))
- #@staticmethod
- def tail_err_str(magic_err, rid_err):
- """Return a string indicating the tail record
error(s)"""
- estr = ""
- if magic_err:
- estr = "magic bad"
- if rid_err:
- estr += ", "
- if rid_err:
- estr += "rid mismatch"
- return estr
- tail_err_str = staticmethod(tail_err_str)
-
-class NonExistentRecordError(Exception):
- """Error class for any operation on an non-existent
record"""
- def __init__(self, operation, rid):
- """Constructor"""
- Exception.__init__(self, "Operation on non-existent record: operation=%s;
rid=0x%x" % (operation, rid))
-
-class NotLockedError(Exception):
- """Error class for unlocking a record which is not locked in the first
place"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Unlocking record which is not locked in EnqMap:
rid=0x%x" % rid)
-
-class JournalSpaceExceededError(Exception):
- """Error class for when journal space of resized journal is too small
to contain the transferred records"""
- def __init__(self):
- """Constructor"""
- Exception.__init__(self, "Ran out of journal space while writing
records")
-
-class MessageLengthCsvError(Exception):
- """Message length mismatch between record and CSV test
file"""
- def __init__(self, tnum, exp_msg_len, actual_msg_len):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d;
found %d" %
- (tnum, exp_msg_len, actual_msg_len))
-
-class NumMsgsCsvError(Exception):
- """Number of messages found mismatched with CSV
file"""
- def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
- """Constructor"""
- Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected
%d, found %d" %
- (tnum, exp_num_msgs, actual_num_msgs))
-
-class TransactionCsvError(Exception):
- """Transaction mismatch between record and CSV file"""
- def __init__(self, tnum, exp_transactional):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" %
(tnum, exp_transactional))
-
-class UnexpectedEndOfFileError(Exception):
- """Error class for unexpected end-of-file during
reading"""
- def __init__(self, exp_size, curr_offs):
- """Constructor"""
- Exception.__init__(self, "Unexpected end-of-file: expected file size:%d;
current offset:%d" %
- (exp_size, curr_offs))
-
-class XidLengthCsvError(Exception):
- """Message Xid length mismatch between record and CSV
file"""
- def __init__(self, tnum, exp_xid_len, actual_msg_len):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found
%d" %
- (tnum, exp_xid_len, actual_msg_len))
-
-class XidSizeError(Exception):
- """Error class for Xid size mismatch"""
- def __init__(self, exp_size, act_size, xid_str):
- """Constructor"""
- Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d;
xid=\"%s\"" %
- (exp_size, act_size, xid_str))
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
-
Deleted: store/trunk/cpp/tools/jrnl.py
===================================================================
--- store/trunk/cpp/tools/jrnl.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/jrnl.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,798 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-import jerr
-import os.path, sys, xml.parsers.expat
-from struct import pack, unpack, calcsize
-from time import gmtime, strftime
-
-# TODO: Get rid of these! Use jinf instance instead
-DBLK_SIZE = 128
-SBLK_SIZE = 4 * DBLK_SIZE
-
-# TODO - this is messy - find a better way to handle this
-# This is a global, but is set directly by the calling program
-JRNL_FILE_SIZE = None
-
-#== class Utils ======================================================================
-
-class Utils(object):
- """Class containing utility functions for dealing with the
journal"""
-
- __printchars =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~
"
-
- # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x)
- # When RHEL4 support ends, restore these declarations and remove the older
- # staticmethod() declaration.
-
- #@staticmethod
- def format_data(dsize, data):
- """Format binary data for printing"""
- if data == None:
- return ""
- if Utils._is_printable(data):
- datastr = Utils._split_str(data)
- else:
- datastr = Utils._hex_split_str(data)
- if dsize != len(data):
- raise jerr.DataSizeError(dsize, len(data), datastr)
- return "data(%d)=\"%s\" " % (dsize, datastr)
- format_data = staticmethod(format_data)
-
- #@staticmethod
- def format_xid(xid, xidsize=None):
- """Format binary XID for printing"""
- if xid == None and xidsize != None:
- if xidsize > 0:
- raise jerr.XidSizeError(xidsize, 0, None)
- return ""
- if Utils._is_printable(xid):
- xidstr = Utils._split_str(xid)
- else:
- xidstr = Utils._hex_split_str(xid)
- if xidsize == None:
- xidsize = len(xid)
- elif xidsize != len(xid):
- raise jerr.XidSizeError(xidsize, len(xid), xidstr)
- return "xid(%d)=\"%s\" " % (xidsize, xidstr)
- format_xid = staticmethod(format_xid)
-
- #@staticmethod
- def inv_str(string):
- """Perform a binary 1's compliment (invert all bits) on a
binary string"""
- istr = ""
- for index in range(0, len(string)):
- istr += chr(~ord(string[index]) & 0xff)
- return istr
- inv_str = staticmethod(inv_str)
-
- #@staticmethod
- def load(fhandle, klass):
- """Load a record of class klass from a file"""
- args = Utils._load_args(fhandle, klass)
- subclass = klass.discriminate(args)
- result = subclass(*args) # create instance of record
- if subclass != klass:
- result.init(fhandle, *Utils._load_args(fhandle, subclass))
- result.skip(fhandle)
- return result
- load = staticmethod(load)
-
- #@staticmethod
- def load_file_data(fhandle, size, data):
- """Load the data portion of a message from file"""
- if size == 0:
- return (data, True)
- if data == None:
- loaded = 0
- else:
- loaded = len(data)
- foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
- if foverflow:
- rsize = JRNL_FILE_SIZE - fhandle.tell()
- else:
- rsize = size - loaded
- fbin = fhandle.read(rsize)
- if data == None:
- data = unpack("%ds" % (rsize), fbin)[0]
- else:
- data = data + unpack("%ds" % (rsize), fbin)[0]
- return (data, not foverflow)
- load_file_data = staticmethod(load_file_data)
-
- #@staticmethod
- def rem_bytes_in_blk(fhandle, blk_size):
- """Return the remaining bytes in a block"""
- foffs = fhandle.tell()
- return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
- rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
-
- #@staticmethod
- def size_in_blks(size, blk_size):
- """Return the size in terms of data blocks"""
- return int((size + blk_size - 1) / blk_size)
- size_in_blks = staticmethod(size_in_blks)
-
- #@staticmethod
- def size_in_bytes_to_blk(size, blk_size):
- """Return the bytes remaining until the next block
boundary"""
- return Utils.size_in_blks(size, blk_size) * blk_size
- size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
-
- #@staticmethod
- def _hex_split_str(in_str, split_size = 50):
- """Split a hex string into two parts separated by an
ellipsis"""
- if len(in_str) <= split_size:
- return Utils._hex_str(in_str, 0, len(in_str))
-# if len(in_str) > split_size + 25:
-# return Utils._hex_str(in_str, 0, 10) + " ... " +
Utils._hex_str(in_str, 55, 65) + " ... " + \
-# Utils._hex_str(in_str, len(in_str)-10, len(in_str))
- return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str,
len(in_str)-10, len(in_str))
- _hex_split_str = staticmethod(_hex_split_str)
-
- #@staticmethod
- def _hex_str(in_str, begin, end):
- """Return a binary string as a hex string"""
- hstr = ""
- for index in range(begin, end):
- if Utils._is_printable(in_str[index]):
- hstr += in_str[index]
- else:
- hstr += "\\%02x" % ord(in_str[index])
- return hstr
- _hex_str = staticmethod(_hex_str)
-
- #@staticmethod
- def _is_printable(in_str):
- """Return True if in_str in printable; False
otherwise."""
- return in_str.strip(Utils.__printchars) == ""
- _is_printable = staticmethod(_is_printable)
-
- #@staticmethod
- def _load_args(fhandle, klass):
- """Load the arguments from class klass"""
- size = calcsize(klass.FORMAT)
- foffs = fhandle.tell(),
- fbin = fhandle.read(size)
- if len(fbin) != size:
- raise jerr.UnexpectedEndOfFileError(size, len(fbin))
- return foffs + unpack(klass.FORMAT, fbin)
- _load_args = staticmethod(_load_args)
-
- #@staticmethod
- def _split_str(in_str, split_size = 50):
- """Split a string into two parts separated by an ellipsis if it is
longer than split_size"""
- if len(in_str) < split_size:
- return in_str
- return in_str[:25] + " ... " + in_str[-25:]
- _split_str = staticmethod(_split_str)
-
-
-#== class Hdr =================================================================
-
-class Hdr:
- """Class representing the journal header records"""
-
- FORMAT = "=4sBBHQ"
- HDR_VER = 1
- OWI_MASK = 0x01
- BIG_ENDIAN = sys.byteorder == "big"
- REC_BOUNDARY = DBLK_SIZE
-
- def __init__(self, foffs, magic, ver, endn, flags, rid):
- """Constructor"""
-# Sizeable.__init__(self)
- self.foffs = foffs
- self.magic = magic
- self.ver = ver
- self.endn = endn
- self.flags = flags
- self.rid = long(rid)
-
- def __str__(self):
- """Return string representation of this header"""
- if self.empty():
- return "0x%08x: <empty>" % (self.foffs)
- if self.magic[-1] == "x":
- return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
- if self.magic[-1] in ["a", "c", "d", "e",
"f", "x"]:
- return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" %
(self.foffs, self.magic, self.ver, self.endn,
- self.flags,
self.rid)
- return "0x%08x: <error, unknown magic \"%s\" (possible
overwrite boundary?)>" % (self.foffs, self.magic)
-
- #@staticmethod
- def discriminate(args):
- """Use the last char in the header magic to determine the header
type"""
- return _CLASSES.get(args[1][-1], Hdr)
- discriminate = staticmethod(discriminate)
-
- def empty(self):
- """Return True if this record is empty (ie has a magic of
0x0000"""
- return self.magic == "\x00"*4
-
- def encode(self):
- """Encode the header into a binary string"""
- return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
-
- def owi(self):
- """Return the OWI (overwrite indicator) for this
header"""
- return self.flags & self.OWI_MASK != 0
-
- def skip(self, fhandle):
- """Read and discard the remainder of this
record"""
- fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
-
- def check(self):
- """Check that this record is valid"""
- if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in
["a", "c", "d", "e", "f",
"x"]:
- return True
- if self.magic[-1] != "x":
- if self.ver != self.HDR_VER:
- raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
- if bool(self.endn) != self.BIG_ENDIAN:
- raise jerr.EndianMismatchError(self.BIG_ENDIAN)
- return False
-
-
-#== class FileHdr =============================================================
-
-class FileHdr(Hdr):
- """Class for file headers, found at the beginning of journal
files"""
-
- FORMAT = "=2H4x3Q"
- REC_BOUNDARY = SBLK_SIZE
-
- def __str__(self):
- """Return a string representation of the this FileHdr
instance"""
- return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self),
self.fid, self.lid, self.fro,
- self.timestamp_str())
-
- def encode(self):
- """Encode this class into a binary string"""
- return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro,
self.time_sec, self.time_ns)
-
- def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
- """Initialize this instance to known values"""
- self.fid = fid
- self.lid = lid
- self.fro = fro
- self.time_sec = time_sec
- self.time_ns = time_ns
-
- def timestamp(self):
- """Get the timestamp of this record as a tuple (secs,
nsecs)"""
- return (self.time_sec, self.time_ns)
-
- def timestamp_str(self):
- """Get the timestamp of this record in string
format"""
- time = gmtime(self.time_sec)
- fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
- return strftime(fstr, time)
-
-
-#== class DeqRec ==============================================================
-
-class DeqRec(Hdr):
- """Class for a dequeue record"""
-
- FORMAT = "=QQ"
-
- def __str__(self):
- """Return a string representation of the this DeqRec
instance"""
- return "%s %sdrid=0x%x" % (Hdr.__str__(self),
Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
-
- def init(self, fhandle, foffs, deq_rid, xidsize):
- """Initialize this instance to known values"""
- self.deq_rid = deq_rid
- self.xidsize = xidsize
- self.xid = None
- self.deq_tail = None
- self.xid_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def encode(self):
- """Encode this class into a binary string"""
- buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
- if self.xidsize > 0:
- fmt = "%ds" % (self.xidsize)
- buf += pack(fmt, self.xid)
- buf += self.deq_tail.encode()
- return buf
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been
loaded"""
- if self.xidsize == 0:
- self.xid_complete = True
- self.tail_complete = True
- else:
- if not self.xid_complete:
- (self.xid, self.xid_complete) = Utils.load_file_data(fhandle,
self.xidsize, self.xid)
- if self.xid_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT),
self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
- magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.deq_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False
otherwise"""
- return self.xid_complete and self.tail_complete
-
-
-#== class TxnRec ==============================================================
-
-class TxnRec(Hdr):
- """Class for a transaction commit/abort record"""
-
- FORMAT = "=Q"
-
- def __str__(self):
- """Return a string representation of the this TxnRec
instance"""
- return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid,
self.xidsize))
-
- def init(self, fhandle, foffs, xidsize):
- """Initialize this instance to known values"""
- self.xidsize = xidsize
- self.xid = None
- self.tx_tail = None
- self.xid_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def encode(self):
- """Encode this class into a binary string"""
- return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) +
pack("%ds" % self.xidsize, self.xid) + \
- self.tx_tail.encode()
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been
loaded"""
- if not self.xid_complete:
- ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
- self.xid = ret[0]
- self.xid_complete = ret[1]
- if self.xid_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
- magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.tx_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False
otherwise"""
- return self.xid_complete and self.tail_complete
-
-
-#== class EnqRec ==============================================================
-
-class EnqRec(Hdr):
- """Class for a enqueue record"""
-
- FORMAT = "=QQ"
- TRANSIENT_MASK = 0x10
- EXTERN_MASK = 0x20
-
- def __str__(self):
- """Return a string representation of the this EnqRec
instance"""
- return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid,
self.xidsize),
- Utils.format_data(self.dsize, self.data),
self.enq_tail, self.print_flags())
-
- def encode(self):
- """Encode this class into a binary string"""
- buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
- if self.xidsize > 0:
- buf += pack("%ds" % self.xidsize, self.xid)
- if self.dsize > 0:
- buf += pack("%ds" % self.dsize, self.data)
- if self.xidsize > 0 or self.dsize > 0:
- buf += self.enq_tail.encode()
- return buf
-
- def init(self, fhandle, foffs, xidsize, dsize):
- """Initialize this instance to known values"""
- self.xidsize = xidsize
- self.dsize = dsize
- self.transient = self.flags & self.TRANSIENT_MASK > 0
- self.extern = self.flags & self.EXTERN_MASK > 0
- self.xid = None
- self.data = None
- self.enq_tail = None
- self.xid_complete = False
- self.data_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been
loaded"""
- if not self.xid_complete:
- ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
- self.xid = ret[0]
- self.xid_complete = ret[1]
- if self.xid_complete and not self.data_complete:
- if self.extern:
- self.data_complete = True
- else:
- ret = Utils.load_file_data(fhandle, self.dsize, self.data)
- self.data = ret[0]
- self.data_complete = ret[1]
- if self.data_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
- magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.enq_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False
otherwise"""
- return self.xid_complete and self.data_complete and self.tail_complete
-
- def print_flags(self):
- """Utility function to decode the flags field in the header and
print a string representation"""
- fstr = ""
- if self.transient:
- fstr = "*TRANSIENT"
- if self.extern:
- if len(fstr) > 0:
- fstr += ",EXTERNAL"
- else:
- fstr = "*EXTERNAL"
- if len(fstr) > 0:
- fstr += "*"
- return fstr
-
-
-#== class RecTail =============================================================
-
-class RecTail:
- """Class for a record tail - for all records where either an XID or
data separate the header from the end of the
- record"""
-
- FORMAT = "=4sQ"
-
- def __init__(self, foffs, magic_inv, rid):
- """Initialize this instance to known values"""
- self.foffs = foffs
- self.magic_inv = magic_inv
- self.rid = long(rid)
-
- def __str__(self):
- """Return a string representation of the this RecTail
instance"""
- magic = Utils.inv_str(self.magic_inv)
- return "[\"%s\" rid=0x%x]" % (magic, self.rid)
-
- def encode(self):
- """Encode this class into a binary string"""
- return pack(RecTail.FORMAT, self.magic_inv, self.rid)
-
-
-#== class JrnlInfo ============================================================
-
-class JrnlInfo(object):
- """
- This object reads and writes journal information files (<basename>.jinf).
Methods are provided
- to read a file, query its properties and reset just those properties necessary for
normalizing
- and resizing a journal.
-
- Normalizing: resetting the directory and/or base filename to different values. This
is necessary
- if a set of journal files is copied from one location to another before being
restored, as the
- value of the path in the file no longer matches the actual path.
-
- Resizing: If the journal geometry parameters (size and number of journal files)
changes, then the
- .jinf file must reflect these changes, as this file is the source of information for
journal
- recovery.
-
- NOTE: Data size vs File size: There are methods which return the data size and file
size of the
- journal files.
-
- +-------------+--------------------/ /----------+
- | File header | File data |
- +-------------+--------------------/ /----------+
- | | |
- | |<---------- Data size ---------->|
- |<------------------ File Size ---------------->|
-
- Data size: The size of the data content of the journal, ie that part which stores the
data records.
-
- File size: The actual disk size of the journal including data and the file header
which precedes the
- data.
-
- The file header is fixed to 1 sblk, so file size = jrnl size + sblk size.
- """
-
- def __init__(self, jdir, bfn = "JournalData"):
- """Constructor"""
- self.__jdir = jdir
- self.__bfn = bfn
- self.__jinf_dict = {}
- self._read_jinf()
-
- def __str__(self):
- """Create a string containing all of the journal info contained in
the jinf file"""
- ostr = "Journal info file %s:\n" % os.path.join(self.__jdir,
"%s.jinf" % self.__bfn)
- for key, val in self.__jinf_dict.iteritems():
- ostr += " %s = %s\n" % (key, val)
- return ostr
-
- def normalize(self, jdir = None, bfn = None):
- """Normalize the directory (ie reset the directory path to match
the actual current location) for this
- jinf file"""
- if jdir == None:
- self.__jinf_dict["directory"] = self.__jdir
- else:
- self.__jdir = jdir
- self.__jinf_dict["directory"] = jdir
- if bfn != None:
- self.__bfn = bfn
- self.__jinf_dict["base_filename"] = bfn
-
- def resize(self, num_jrnl_files = None, jrnl_file_size = None):
- """Reset the journal size information to allow for resizing the
journal"""
- if num_jrnl_files != None:
- self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
- if jrnl_file_size != None:
- self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size *
self.get_jrnl_dblk_size_bytes()
-
- def write(self, jdir = None, bfn = None):
- """Write the .jinf file"""
- self.normalize(jdir, bfn)
- if not os.path.exists(self.get_jrnl_dir()):
- os.makedirs(self.get_jrnl_dir())
- fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" %
self.get_jrnl_base_name()), "w")
- fhandle.write("<?xml version=\"1.0\" ?>\n")
- fhandle.write("<jrnl>\n")
- fhandle.write(" <journal_version value=\"%d\" />\n" %
self.get_jrnl_version())
- fhandle.write(" <journal_id>\n")
- fhandle.write(" <id_string value=\"%s\" />\n" %
self.get_jrnl_id())
- fhandle.write(" <directory value=\"%s\" />\n" %
self.get_jrnl_dir())
- fhandle.write(" <base_filename value=\"%s\" />\n" %
self.get_jrnl_base_name())
- fhandle.write(" </journal_id>\n")
- fhandle.write(" <creation_time>\n")
- fhandle.write(" <seconds value=\"%d\" />\n" %
self.get_creation_time()[0])
- fhandle.write(" <nanoseconds value=\"%d\" />\n" %
self.get_creation_time()[1])
- fhandle.write(" <string value=\"%s\" />\n" %
self.get_creation_time_str())
- fhandle.write(" </creation_time>\n")
- fhandle.write(" <journal_file_geometry>\n")
- fhandle.write(" <number_jrnl_files value=\"%d\"
/>\n" % self.get_num_jrnl_files())
- fhandle.write(" <auto_expand value=\"%s\" />\n" %
str.lower(str(self.get_auto_expand())))
- fhandle.write(" <jrnl_file_size_sblks value=\"%d\"
/>\n" % self.get_jrnl_data_size_sblks())
- fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" %
self.get_jrnl_sblk_size_dblks())
- fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" %
self.get_jrnl_dblk_size_bytes())
- fhandle.write(" </journal_file_geometry>\n")
- fhandle.write(" <cache_geometry>\n")
- fhandle.write(" <wcache_pgsize_sblks value=\"%d\"
/>\n" % self.get_wr_buf_pg_size_sblks())
- fhandle.write(" <wcache_num_pages value=\"%d\" />\n"
% self.get_num_wr_buf_pgs())
- fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\"
/>\n" % self.get_rd_buf_pg_size_sblks())
- fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n"
% self.get_num_rd_buf_pgs())
- fhandle.write(" </cache_geometry>\n")
- fhandle.write("</jrnl>\n")
- fhandle.close()
-
- # Journal ID
-
- def get_jrnl_version(self):
- """Get the journal version"""
- return self.__jinf_dict["journal_version"]
-
- def get_jrnl_id(self):
- """Get the journal id"""
- return self.__jinf_dict["id_string"]
-
- def get_current_dir(self):
- """Get the current directory of the store (as opposed to that
value saved in the .jinf file)"""
- return self.__jdir
-
- def get_jrnl_dir(self):
- """Get the journal directory stored in the .jinf
file"""
- return self.__jinf_dict["directory"]
-
- def get_jrnl_base_name(self):
- """Get the base filename - that string used to name the journal
files <basefilename>-nnnn.jdat and
- <basefilename>.jinf"""
- return self.__jinf_dict["base_filename"]
-
- # Journal creation time
-
- def get_creation_time(self):
- """Get journal creation time as a tuple (secs,
nsecs)"""
- return (self.__jinf_dict["seconds"],
self.__jinf_dict["nanoseconds"])
-
- def get_creation_time_str(self):
- """Get journal creation time as a string"""
- return self.__jinf_dict["string"]
-
- # --- Files and geometry ---
-
- def get_num_jrnl_files(self):
- """Get number of data files in the journal"""
- return self.__jinf_dict["number_jrnl_files"]
-
- def get_auto_expand(self):
- """Return True if auto-expand is enabled; False
otherwise"""
- return self.__jinf_dict["auto_expand"]
-
- def get_jrnl_sblk_size_dblks(self):
- """Get the journal softblock size in dblks"""
- return self.__jinf_dict["JRNL_SBLK_SIZE"]
-
- def get_jrnl_sblk_size_bytes(self):
- """Get the journal softblock size in bytes"""
- return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_jrnl_dblk_size_bytes(self):
- """Get the journal datablock size in bytes"""
- return self.__jinf_dict["JRNL_DBLK_SIZE"]
-
- def get_jrnl_data_size_sblks(self):
- """Get the data capacity (excluding the file headers) for one
journal file in softblocks"""
- return self.__jinf_dict["jrnl_file_size_sblks"]
-
- def get_jrnl_data_size_dblks(self):
- """Get the data capacity (excluding the file headers) for one
journal file in datablocks"""
- return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_jrnl_data_size_bytes(self):
- """Get the data capacity (excluding the file headers) for one
journal file in bytes"""
- return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_jrnl_file_size_sblks(self):
- """Get the size of one journal file on disk (including the file
headers) in softblocks"""
- return self.get_jrnl_data_size_sblks() + 1
-
- def get_jrnl_file_size_dblks(self):
- """Get the size of one journal file on disk (including the file
headers) in datablocks"""
- return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_jrnl_file_size_bytes(self):
- """Get the size of one journal file on disk (including the file
headers) in bytes"""
- return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_tot_jrnl_data_size_sblks(self):
- """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
- softblocks"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
-
- def get_tot_jrnl_data_size_dblks(self):
- """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
- datablocks"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
-
- def get_tot_jrnl_data_size_bytes(self):
- """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
- bytes"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
-
- # Read and write buffers
-
- def get_wr_buf_pg_size_sblks(self):
- """Get the size of the write buffer pages in
softblocks"""
- return self.__jinf_dict["wcache_pgsize_sblks"]
-
- def get_wr_buf_pg_size_dblks(self):
- """Get the size of the write buffer pages in
datablocks"""
- return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_wr_buf_pg_size_bytes(self):
- """Get the size of the write buffer pages in
bytes"""
- return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_num_wr_buf_pgs(self):
- """Get the number of write buffer pages"""
- return self.__jinf_dict["wcache_num_pages"]
-
- def get_rd_buf_pg_size_sblks(self):
- """Get the size of the read buffer pages in
softblocks"""
- return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
-
- def get_rd_buf_pg_size_dblks(self):
- """Get the size of the read buffer pages in
datablocks"""
- return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
-
- def get_rd_buf_pg_size_bytes(self):
- """Get the size of the read buffer pages in
bytes"""
- return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
-
- def get_num_rd_buf_pgs(self):
- """Get the number of read buffer pages"""
- return self.__jinf_dict["JRNL_RMGR_PAGES"]
-
- def _read_jinf(self):
- """Read and initialize this instance from an existing jinf file
located at the directory named in the
- constructor - called by the constructor"""
- fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn),
"r")
- parser = xml.parsers.expat.ParserCreate()
- parser.StartElementHandler = self._handle_xml_start_elt
- parser.CharacterDataHandler = self._handle_xml_char_data
- parser.EndElementHandler = self._handle_xml_end_elt
- parser.ParseFile(fhandle)
- fhandle.close()
-
- def _handle_xml_start_elt(self, name, attrs):
- """Callback for handling XML start elements. Used by the XML
parser."""
- # bool values
- if name == "auto_expand":
- self.__jinf_dict[name] = attrs["value"] == "true"
- # long values
- elif name == "seconds" or \
- name == "nanoseconds":
- self.__jinf_dict[name] = long(attrs["value"])
- # int values
- elif name == "journal_version" or \
- name == "number_jrnl_files" or \
- name == "jrnl_file_size_sblks" or \
- name == "JRNL_SBLK_SIZE" or \
- name == "JRNL_DBLK_SIZE" or \
- name == "wcache_pgsize_sblks" or \
- name == "wcache_num_pages" or \
- name == "JRNL_RMGR_PAGE_SIZE" or \
- name == "JRNL_RMGR_PAGES":
- self.__jinf_dict[name] = int(attrs["value"])
- # strings
- elif "value" in attrs:
- self.__jinf_dict[name] = attrs["value"]
-
- def _handle_xml_char_data(self, data):
- """Callback for handling character data (ie within
<elt>...</elt>). The jinf file does not use this in its
- data. Used by the XML parser."""
- pass
-
- def _handle_xml_end_elt(self, name):
- """Callback for handling XML end elements. Used by XML
parser."""
- pass
-
-
-#==============================================================================
-
-_CLASSES = {
- "a": TxnRec,
- "c": TxnRec,
- "d": DeqRec,
- "e": EnqRec,
- "f": FileHdr
-}
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
Copied: store/trunk/cpp/tools/qpidstore/__init__.py (from rev 4457,
store/trunk/cpp/tools/__init__.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/__init__.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/__init__.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,23 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
Copied: store/trunk/cpp/tools/qpidstore/janal.py (from rev 4457,
store/trunk/cpp/tools/janal.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/janal.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/janal.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,612 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr, jrnl
+import os.path, sys
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+ """Class for maintaining a map of enqueued records, indexing the rid
against hdr, fid and transaction lock"""
+
+ def __init__(self):
+ """Constructor"""
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr, lock = False):
+ """Add a new record into the map"""
+ if hdr.rid in self.__map:
+ raise jerr.DuplicateRidError(hdr.rid)
+ self.__map[hdr.rid] = [fid, hdr, lock]
+
+ def contains(self, rid):
+ """Return True if the map contains the given
rid"""
+ return rid in self.__map
+
+ def delete(self, rid):
+ """Delete the rid and its associated data from the
map"""
+ if rid in self.__map:
+ if self.get_lock(rid):
+ raise jerr.DeleteLockedRecordError(rid)
+ del self.__map[rid]
+ else:
+ raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap:
rid=0x%x" % rid)
+
+ def get(self, rid):
+ """Return a list [fid, hdr, lock] for the given
rid"""
+ if self.contains(rid):
+ return self.__map[rid]
+ return None
+
+ def get_fid(self, rid):
+ """Return the fid for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][0]
+ return None
+
+ def get_hdr(self, rid):
+ """Return the header record for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][1]
+ return None
+
+ def get_lock(self, rid):
+ """Return the transaction lock value for the given
rid"""
+ if self.contains(rid):
+ return self.__map[rid][2]
+ return None
+
+ def get_rec_list(self):
+ """Return a list of tuples (fid, hdr, lock) for all entries in the
map"""
+ return self.__map.values()
+
+ def lock(self, rid):
+ """Set the transaction lock for a given rid to
True"""
+ if rid in self.__map:
+ if not self.__map[rid][2]: # locked
+ self.__map[rid][2] = True
+ else:
+ raise jerr.AlreadyLockedError(rid)
+ else:
+ raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap:
rid=0x%x" % rid)
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the
map"""
+ if len(self.__map) == 0:
+ return "No enqueued records found."
+ rstr = "%d enqueued records found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.__map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ if self.__map[rid][2]:
+ lock_str = " [LOCKED]"
+ else:
+ lock_str = ""
+ rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
+ else:
+ rstr += "."
+ return rstr
+
+ def rids(self):
+ """Return a list of rids in the map"""
+ return self.__map.keys()
+
+ def size(self):
+ """Return the number of entries in the map"""
+ return len(self.__map)
+
+ def unlock(self, rid):
+ """Set the transaction lock for a given rid to
False"""
+ if rid in self.__map:
+ if self.__map[rid][2]:
+ self.__map[rid][2] = False
+ else:
+ raise jerr.NotLockedError(rid)
+ else:
+ raise jerr.NonExistentRecordError("unlock", rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+ """Transaction map, which maps xids to a list of outstanding
actions"""
+
+ def __init__(self, emap):
+ """Constructor, requires an existing EnqMap
instance"""
+ self.__emap = emap
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ """Add a new transactional record into the map"""
+ if isinstance(hdr, jrnl.DeqRec):
+ try:
+ self.__emap.lock(hdr.deq_rid)
+ except jerr.JWarning:
+ # Not in emap, look for rid in tmap
+ l = self.find_rid(hdr.deq_rid, hdr.xid)
+ if l != None:
+ if l[2]:
+ raise jerr.AlreadyLockedError(hdr.deq_rid)
+ l[2] = True
+ if hdr.xid in self.__map:
+ self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
+ else:
+ self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
+
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False
otherwise"""
+ return xid in self.__map
+
+ def delete(self, hdr):
+ """Remove a transaction record from the map using either a commit
or abort header"""
+ if hdr.magic[-1] == "c":
+ return self._commit(hdr.xid)
+ if hdr.magic[-1] == "a":
+ self._abort(hdr.xid)
+ else:
+ raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic,
hdr.rid)
+
+ def find_rid(self, rid, xid_hint = None):
+ """ Search for and return map list with supplied rid. If xid_hint
is supplied, try that xid first"""
+ if xid_hint != None and self.contains(xid_hint):
+ for l in self.__map[xid_hint]:
+ if l[1].rid == rid:
+ return l
+ for xid in self.__map.iterkeys():
+ if xid_hint == None or xid != xid_hint:
+ for l in self.__map[xid]:
+ if l[1].rid == rid:
+ return l
+
+ def get(self, xid):
+ """Return a list of operations for the given
xid"""
+ if self.contains(xid):
+ return self.__map[xid]
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the
map"""
+ if len(self.__map) == 0:
+ return "No outstanding transactions found."
+ rstr = "%d outstanding transactions found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ for xid, tup in self.__map.iteritems():
+ rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
+ for i in tup:
+ rstr += "\n %s" % str(i[1])
+ else:
+ rstr += "."
+ return rstr
+
+ def size(self):
+ """Return the number of xids in the map"""
+ return len(self.__map)
+
+ def xids(self):
+ """Return a list of xids in the map"""
+ return self.__map.keys()
+
+ def _abort(self, xid):
+ """Perform an abort operation for the given xid
record"""
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, jrnl.DeqRec):
+ self.__emap.unlock(hdr.rid)
+ del self.__map[xid]
+
+ def _commit(self, xid):
+ """Perform a commit operation for the given xid
record"""
+ mismatch_list = []
+ for fid, hdr, lock in self.__map[xid]:
+ if isinstance(hdr, jrnl.EnqRec):
+ self.__emap.add(fid, hdr, lock) # Transfer enq to emap
+ else:
+ if self.__emap.contains(hdr.deq_rid):
+ self.__emap.unlock(hdr.deq_rid)
+ self.__emap.delete(hdr.deq_rid)
+ else:
+ mismatch_list.append("0x%x" % hdr.deq_rid)
+ del self.__map[xid]
+ return mismatch_list
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+ """
+ This class analyzes a set of journal files and determines which is the last to be
written
+ (the newest file), and hence which should be the first to be read for recovery (the
oldest
+ file).
+
+ The analysis is performed on construction; the contents of the JrnlInfo object passed
provide
+ the recovery details.
+ """
+
+ def __init__(self, jinf):
+ """Constructor"""
+ self.__oldest = None
+ self.__jinf = jinf
+ self.__flist = self._analyze()
+
+ def __str__(self):
+ """String representation of this JrnlAnalyzer instance, will print
out results of analysis."""
+ ostr = "Journal files analyzed in directory %s (* = earliest full):\n"
% self.__jinf.get_current_dir()
+ if self.is_empty():
+ ostr += " <All journal files are empty>\n"
+ else:
+ for tup in self.__flist:
+ tmp = " "
+ if tup[0] == self.__oldest[0]:
+ tmp = "*"
+ ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp,
os.path.basename(tup[1]), tup[2],
+ tup[3], tup[4],
tup[5])
+ for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
+ ostr += " %s.%04x.jdat: <empty>\n" %
(self.__jinf.get_jrnl_base_name(), i)
+ return ostr
+
+ # Analysis
+
+ def get_oldest_file(self):
+ """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the
oldest data file found in the journal"""
+ return self.__oldest
+
+ def get_oldest_file_index(self):
+ """Return the ordinal number of the oldest data file found in the
journal"""
+ if self.is_empty():
+ return None
+ return self.__oldest[0]
+
+ def is_empty(self):
+ """Return true if the analysis found that the journal file has
never been written to"""
+ return len(self.__flist) == 0
+
+ def _analyze(self):
+ """Perform the journal file analysis by reading and comparing the
file headers of each journal data file"""
+ owi_found = False
+ flist = []
+ for i in range(0, self.__jinf.get_num_jrnl_files()):
+ jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" %
(self.__jinf.get_jrnl_base_name(), i))
+ fhandle = open(jfn)
+ fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
+ if fhdr.empty():
+ break
+ this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ flist.append(this_tup)
+ if i == 0:
+ init_owi = fhdr.owi()
+ self.__oldest = this_tup
+ elif fhdr.owi() != init_owi and not owi_found:
+ self.__oldest = this_tup
+ owi_found = True
+ return flist
+
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+ """
+ This class contains an Enqueue Map (emap), a transaction map (tmap) and a
transaction
+ object list (txn_obj_list) which are populated by reading the journals from the
oldest
+ to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+ objects supplied on construction provide the information used for the recovery.
+
+ The analysis is performed on construction.
+ """
+
+ def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
+ """Constructor, which reads all """
+ self._jinfo = jinfo
+ self._jra = jra
+ self._qflag = qflag
+ self._rflag = rflag
+ self._vflag = vflag
+
+ # test callback functions for CSV tests
+ self._csv_store_chk = None
+ self._csv_start_cb = None
+ self._csv_enq_cb = None
+ self._csv_deq_cb = None
+ self._csv_txn_cb = None
+ self._csv_end_cb = None
+
+ self._emap = EnqMap()
+ self._tmap = TxnMap(self._emap)
+ self._txn_obj_list = {}
+
+ self._file = None
+ self._file_hdr = None
+ self._file_num = None
+ self._first_rec_flag = None
+ self._fro = None
+ self._last_file_flag = None
+ self._start_file_num = None
+ self._file_hdr_owi = None
+ self._warning = []
+
+ self._abort_cnt = 0
+ self._commit_cnt = 0
+ self._msg_cnt = 0
+ self._rec_cnt = 0
+ self._txn_msg_cnt = 0
+
+ def __str__(self):
+ """Print out all the undequeued records"""
+ return self.report(True, self._rflag)
+
+ def emap(self):
+ """Get the enqueue map"""
+ return self._emap
+
+ def get_abort_cnt(self):
+ """Get the cumulative number of transactional aborts
found"""
+ return self._abort_cnt
+
+ def get_commit_cnt(self):
+ """Get the cumulative number of transactional commits
found"""
+ return self._commit_cnt
+
+ def get_msg_cnt(self):
+ """Get the cumulative number of messages found"""
+ return self._msg_cnt
+
+ def get_rec_cnt(self):
+ """Get the cumulative number of journal records (including
fillers) found"""
+ return self._rec_cnt
+
+ def is_last_file(self):
+ """Return True if the last file is being read"""
+ return self._last_file_flag
+
+ def report(self, show_stats = True, show_records = False):
+ """Return a string containing a report on the file
analysis"""
+ rstr = self._emap.report(show_stats, show_records) + "\n" +
self._tmap.report(show_stats, show_records)
+ #TODO - print size analysis here - ie how full, sparse, est. space remaining
before enq threshold
+ return rstr
+
+ def run(self):
+ """Perform the read of the journal"""
+ if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
+ return
+ if self._jra.is_empty():
+ return
+ stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
+ while not stop and not self._get_next_record():
+ pass
+ if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
+ return
+ if not self._qflag:
+ print
+
+ def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None,
csv_deq_cb = None, csv_txn_cb = None,
+ csv_end_cb = None):
+ """Set callbacks for checks to be made at various points while
reading the journal"""
+ self._csv_store_chk = csv_store_chk
+ self._csv_start_cb = csv_start_cb
+ self._csv_enq_cb = csv_enq_cb
+ self._csv_deq_cb = csv_deq_cb
+ self._csv_txn_cb = csv_txn_cb
+ self._csv_end_cb = csv_end_cb
+
+ def tmap(self):
+ """Return the transaction map"""
+ return self._tmap
+
+ def get_txn_msg_cnt(self):
+ """Get the cumulative transactional message
count"""
+ return self._txn_msg_cnt
+
+ def txn_obj_list(self):
+ """Get a cumulative list of transaction objects (commits and
aborts)"""
+ return self._txn_obj_list
+
+ def _advance_jrnl_file(self, *oldest_file_info):
+ """Rotate to using the next journal file. Return False if the
operation was successful, True if there are no
+ more files to read."""
+ fro_seek_flag = False
+ if len(oldest_file_info) > 0:
+ self._start_file_num = self._file_num = oldest_file_info[0]
+ self._fro = oldest_file_info[4]
+ fro_seek_flag = True # jump to fro to start reading
+ if not self._qflag and not self._rflag:
+ if self._vflag:
+ print "Recovering journals..."
+ else:
+ print "Recovering journals",
+ if self._file != None and self._is_file_full():
+ self._file.close()
+ self._file_num = self._incr_file_num()
+ if self._file_num == self._start_file_num:
+ return True
+ if self._start_file_num == 0:
+ self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files()
- 1
+ else:
+ self._last_file_flag = self._file_num == self._start_file_num - 1
+ if self._file_num < 0 or self._file_num >=
self._jinfo.get_num_jrnl_files():
+ raise jerr.BadFileNumberError(self._file_num)
+ jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
+ (self._jinfo.get_jrnl_base_name(), self._file_num))
+ self._file = open(jfn)
+ self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ if fro_seek_flag and self._file.tell() != self._fro:
+ self._file.seek(self._fro)
+ self._first_rec_flag = True
+ if not self._qflag:
+ if self._rflag:
+ print jfn, ": ", self._file_hdr
+ elif self._vflag:
+ print "* Reading %s" % jfn
+ else:
+ print ".",
+ sys.stdout.flush()
+ return False
+
+ def _check_owi(self, hdr):
+ """Return True if the header's owi indicator matches that of
the file header record; False otherwise. This can
+ indicate whether the last record in a file has been read and now older records
which have not yet been
+ overwritten are now being read."""
+ return self._file_hdr_owi == hdr.owi()
+
+ def _is_file_full(self):
+ """Return True if the current file is full (no more write space);
false otherwise"""
+ return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
+
+ def _get_next_record(self):
+ """Get the next record in the file for analysis"""
+ if self._is_file_full():
+ if self._advance_jrnl_file():
+ return True
+ try:
+ hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ except:
+ return True
+ if hdr.empty():
+ return True
+ if hdr.check():
+ return True
+ self._rec_cnt += 1
+ self._file_hdr_owi = self._file_hdr.owi()
+ if self._first_rec_flag:
+ if self._file_hdr.fro != hdr.foffs:
+ raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
+ else:
+ if self._rflag:
+ print " * fro ok: 0x%x" % self._file_hdr.fro
+ self._first_rec_flag = False
+ stop = False
+ if isinstance(hdr, jrnl.EnqRec):
+ stop = self._handle_enq_rec(hdr)
+ elif isinstance(hdr, jrnl.DeqRec):
+ stop = self._handle_deq_rec(hdr)
+ elif isinstance(hdr, jrnl.TxnRec):
+ stop = self._handle_txn_rec(hdr)
+ wstr = ""
+ for warn in self._warning:
+ wstr += " (%s)" % warn
+ if self._rflag:
+ print " > %s %s" % (hdr, wstr)
+ self._warning = []
+ return stop
+
+ def _handle_deq_rec(self, hdr):
+ """Process a dequeue ("RHMd") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
+ return True
+
+ try:
+ if hdr.xid == None:
+ self._emap.delete(hdr.deq_rid)
+ else:
+ self._tmap.add(self._file_hdr.fid, hdr)
+ except jerr.JWarning, warn:
+ self._warning.append(str(warn))
+ return False
+
+ def _handle_enq_rec(self, hdr):
+ """Process a dequeue ("RHMe") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check extern flag
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagDataError(hdr)
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.xid == None:
+ self._emap.add(self._file_hdr.fid, hdr)
+ else:
+ self._txn_msg_cnt += 1
+ self._tmap.add(self._file_hdr.fid, hdr)
+ self._msg_cnt += 1
+ return False
+
+ def _handle_txn_rec(self, hdr):
+ """Process a transaction ("RHMa or RHMc")
record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.magic[-1] == "a":
+ self._abort_cnt += 1
+ else:
+ self._commit_cnt += 1
+
+ if self._tmap.contains(hdr.xid):
+ mismatched_rids = self._tmap.delete(hdr)
+ if mismatched_rids != None and len(mismatched_rids) > 0:
+ self._warning.append("WARNING: transactional dequeues not found in
enqueue map; rids=%s" %
+ mismatched_rids)
+ else:
+ self._warning.append("WARNING: %s not found in transaction map" %
jrnl.Utils.format_xid(hdr.xid))
+ if hdr.magic[-1] == "c": # commits only
+ self._txn_obj_list[hdr.xid] = hdr
+ return False
+
+ def _incr_file_num(self):
+ """Increment the number of files read with wraparound (ie after
file n-1, go to 0)"""
+ self._file_num += 1
+ if self._file_num >= self._jinfo.get_num_jrnl_files():
+ self._file_num = 0
+ return self._file_num
+
+ def _load_rec(self, hdr):
+ """Load a single record for the given header. There may be
arbitrarily large xids and data components."""
+ while not hdr.complete():
+ if self._advance_jrnl_file():
+ return True
+ hdr.load(self._file)
+ return False
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
Copied: store/trunk/cpp/tools/qpidstore/jerr.py (from rev 4457,
store/trunk/cpp/tools/jerr.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/jerr.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/jerr.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,223 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+# == Warnings =================================================================
+
+class JWarning(Exception):
+ """Class to convey a warning"""
+ def __init__(self, err):
+ """Constructor"""
+ Exception.__init__(self, err)
+
+# == Errors ===================================================================
+
+class AllJrnlFilesEmptyCsvError(Exception):
+ """All journal files are empty (never been written)"""
+ def __init__(self, tnum, exp_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] All journal files are empty, but test
expects %d msg(s)." %
+ (tnum, exp_num_msgs))
+
+class AlreadyLockedError(Exception):
+ """Error class for trying to lock a record that is already
locked"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Locking record which is already locked in EnqMap:
rid=0x%x" % rid)
+
+class BadFileNumberError(Exception):
+ """Error class for incorrect or unexpected file
number"""
+ def __init__(self, file_num):
+ """Constructor"""
+ Exception.__init__(self, "Bad file number %d" % file_num)
+
+class DataSizeError(Exception):
+ """Error class for data size mismatch"""
+ def __init__(self, exp_size, act_size, data_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d;
data=\"%s\"" %
+ (exp_size, act_size, data_str))
+
+class DeleteLockedRecordError(Exception):
+ """Error class for deleting a locked record from the enqueue
map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s"
% rid)
+
+class DequeueNonExistentEnqueueError(Exception):
+ """Error class for attempting to dequeue a non-existent enqueue record
(rid)"""
+ def __init__(self, deq_rid):
+ """Constructor"""
+ Exception.__init__(self, "Dequeuing non-existent enqueue record:
rid=0x%s" % deq_rid)
+
+class DuplicateRidError(Exception):
+ """Error class for placing duplicate rid into enqueue
map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x"
% rid)
+
+class EndianMismatchError(Exception):
+ """Error class mismatched record header endian flag"""
+ def __init__(self, exp_endianness):
+ """Constructor"""
+ Exception.__init__(self, "Endian mismatch: expected %s, but current record
is %s" %
+ self.endian_str(exp_endianness))
+ #@staticmethod
+ def endian_str(endianness):
+ """Return a string tuple for the endianness error
message"""
+ if endianness:
+ return "big", "little"
+ return "little", "big"
+ endian_str = staticmethod(endian_str)
+
+class ExternFlagDataError(Exception):
+ """Error class for the extern flag being set and the internal size
> 0"""
+ def __init__(self, hdr):
+ """Constructor"""
+ Exception.__init__(self, "Message data found (msg size > 0) on record
with external flag set: hdr=%s" % hdr)
+
+class ExternFlagCsvError(Exception):
+ """External flag mismatch between record and CSV test
file"""
+ def __init__(self, tnum, exp_extern_flag):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s"
% (tnum, exp_extern_flag))
+
+class ExternFlagWithDataCsvError(Exception):
+ """External flag set and Message data found"""
+ def __init__(self, tnum):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message data found on record with
external flag set" % tnum)
+
+class FillExceedsFileSizeError(Exception):
+ """Internal error from a fill operation which will exceed the
specified file size"""
+ def __init__(self, cur_size, file_size):
+ """Constructor"""
+ Exception.__init__(self, "Filling to size %d > max file size %d" %
(cur_size, file_size))
+
+class FillSizeError(Exception):
+ """Internal error from a fill operation that did not match the
calculated end point in the file"""
+ def __init__(self, cur_posn, exp_posn):
+ """Constructor"""
+ Exception.__init__(self, "Filled to size %d > expected file posn %d"
% (cur_posn, exp_posn))
+
+class FirstRecordOffsetMismatch(Exception):
+ """Error class for file header fro mismatch with actual
record"""
+ def __init__(self, fro, actual_offs):
+ """Constructor"""
+ Exception.__init__(self, "File header first record offset mismatch:
fro=0x%x; actual offs=0x%x" %
+ (fro, actual_offs))
+
+class InvalidHeaderVersionError(Exception):
+ """Error class for invalid record header version"""
+ def __init__(self, exp_ver, act_ver):
+ """Constructor"""
+ Exception.__init__(self, "Invalid header version: expected:%d,
actual:%d." % (exp_ver, act_ver))
+
+class InvalidRecordTypeError(Exception):
+ """Error class for any operation using an invalid record
type"""
+ def __init__(self, operation, magic, rid):
+ """Constructor"""
+ Exception.__init__(self, "Invalid record type for operation: operation=%s
record magic=%s, rid=0x%x" %
+ (operation, magic, rid))
+
+class InvalidRecordTailError(Exception):
+ """Error class for invalid record tail"""
+ def __init__(self, magic_err, rid_err, rec):
+ """Constructor"""
+ Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec,
self.tail_err_str(magic_err, rid_err)))
+ #@staticmethod
+ def tail_err_str(magic_err, rid_err):
+ """Return a string indicating the tail record
error(s)"""
+ estr = ""
+ if magic_err:
+ estr = "magic bad"
+ if rid_err:
+ estr += ", "
+ if rid_err:
+ estr += "rid mismatch"
+ return estr
+ tail_err_str = staticmethod(tail_err_str)
+
+class NonExistentRecordError(Exception):
+ """Error class for any operation on an non-existent
record"""
+ def __init__(self, operation, rid):
+ """Constructor"""
+ Exception.__init__(self, "Operation on non-existent record: operation=%s;
rid=0x%x" % (operation, rid))
+
+class NotLockedError(Exception):
+ """Error class for unlocking a record which is not locked in the first
place"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Unlocking record which is not locked in EnqMap:
rid=0x%x" % rid)
+
+class JournalSpaceExceededError(Exception):
+ """Error class for when journal space of resized journal is too small
to contain the transferred records"""
+ def __init__(self):
+ """Constructor"""
+ Exception.__init__(self, "Ran out of journal space while writing
records")
+
+class MessageLengthCsvError(Exception):
+ """Message length mismatch between record and CSV test
file"""
+ def __init__(self, tnum, exp_msg_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d;
found %d" %
+ (tnum, exp_msg_len, actual_msg_len))
+
+class NumMsgsCsvError(Exception):
+ """Number of messages found mismatched with CSV
file"""
+ def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected
%d, found %d" %
+ (tnum, exp_num_msgs, actual_num_msgs))
+
+class TransactionCsvError(Exception):
+ """Transaction mismatch between record and CSV file"""
+ def __init__(self, tnum, exp_transactional):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" %
(tnum, exp_transactional))
+
+class UnexpectedEndOfFileError(Exception):
+ """Error class for unexpected end-of-file during
reading"""
+ def __init__(self, exp_size, curr_offs):
+ """Constructor"""
+ Exception.__init__(self, "Unexpected end-of-file: expected file size:%d;
current offset:%d" %
+ (exp_size, curr_offs))
+
+class XidLengthCsvError(Exception):
+ """Message Xid length mismatch between record and CSV
file"""
+ def __init__(self, tnum, exp_xid_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found
%d" %
+ (tnum, exp_xid_len, actual_msg_len))
+
+class XidSizeError(Exception):
+ """Error class for Xid size mismatch"""
+ def __init__(self, exp_size, act_size, xid_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d;
xid=\"%s\"" %
+ (exp_size, act_size, xid_str))
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
+
Copied: store/trunk/cpp/tools/qpidstore/jrnl.py (from rev 4457,
store/trunk/cpp/tools/jrnl.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/jrnl.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/jrnl.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,798 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr
+import os.path, sys, xml.parsers.expat
+from struct import pack, unpack, calcsize
+from time import gmtime, strftime
+
+# TODO: Get rid of these! Use jinf instance instead
+DBLK_SIZE = 128
+SBLK_SIZE = 4 * DBLK_SIZE
+
+# TODO - this is messy - find a better way to handle this
+# This is a global, but is set directly by the calling program
+JRNL_FILE_SIZE = None
+
+#== class Utils ======================================================================
+
+class Utils(object):
+ """Class containing utility functions for dealing with the
journal"""
+
+ __printchars =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~
"
+
+ # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x)
+ # When RHEL4 support ends, restore these declarations and remove the older
+ # staticmethod() declaration.
+
+ #@staticmethod
+ def format_data(dsize, data):
+ """Format binary data for printing"""
+ if data == None:
+ return ""
+ if Utils._is_printable(data):
+ datastr = Utils._split_str(data)
+ else:
+ datastr = Utils._hex_split_str(data)
+ if dsize != len(data):
+ raise jerr.DataSizeError(dsize, len(data), datastr)
+ return "data(%d)=\"%s\" " % (dsize, datastr)
+ format_data = staticmethod(format_data)
+
+ #@staticmethod
+ def format_xid(xid, xidsize=None):
+ """Format binary XID for printing"""
+ if xid == None and xidsize != None:
+ if xidsize > 0:
+ raise jerr.XidSizeError(xidsize, 0, None)
+ return ""
+ if Utils._is_printable(xid):
+ xidstr = Utils._split_str(xid)
+ else:
+ xidstr = Utils._hex_split_str(xid)
+ if xidsize == None:
+ xidsize = len(xid)
+ elif xidsize != len(xid):
+ raise jerr.XidSizeError(xidsize, len(xid), xidstr)
+ return "xid(%d)=\"%s\" " % (xidsize, xidstr)
+ format_xid = staticmethod(format_xid)
+
+ #@staticmethod
+ def inv_str(string):
+ """Perform a binary 1's compliment (invert all bits) on a
binary string"""
+ istr = ""
+ for index in range(0, len(string)):
+ istr += chr(~ord(string[index]) & 0xff)
+ return istr
+ inv_str = staticmethod(inv_str)
+
+ #@staticmethod
+ def load(fhandle, klass):
+ """Load a record of class klass from a file"""
+ args = Utils._load_args(fhandle, klass)
+ subclass = klass.discriminate(args)
+ result = subclass(*args) # create instance of record
+ if subclass != klass:
+ result.init(fhandle, *Utils._load_args(fhandle, subclass))
+ result.skip(fhandle)
+ return result
+ load = staticmethod(load)
+
+ #@staticmethod
+ def load_file_data(fhandle, size, data):
+ """Load the data portion of a message from file"""
+ if size == 0:
+ return (data, True)
+ if data == None:
+ loaded = 0
+ else:
+ loaded = len(data)
+ foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
+ if foverflow:
+ rsize = JRNL_FILE_SIZE - fhandle.tell()
+ else:
+ rsize = size - loaded
+ fbin = fhandle.read(rsize)
+ if data == None:
+ data = unpack("%ds" % (rsize), fbin)[0]
+ else:
+ data = data + unpack("%ds" % (rsize), fbin)[0]
+ return (data, not foverflow)
+ load_file_data = staticmethod(load_file_data)
+
+ #@staticmethod
+ def rem_bytes_in_blk(fhandle, blk_size):
+ """Return the remaining bytes in a block"""
+ foffs = fhandle.tell()
+ return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
+ rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
+
+ #@staticmethod
+ def size_in_blks(size, blk_size):
+ """Return the size in terms of data blocks"""
+ return int((size + blk_size - 1) / blk_size)
+ size_in_blks = staticmethod(size_in_blks)
+
+ #@staticmethod
+ def size_in_bytes_to_blk(size, blk_size):
+ """Return the bytes remaining until the next block
boundary"""
+ return Utils.size_in_blks(size, blk_size) * blk_size
+ size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
+
+ #@staticmethod
+ def _hex_split_str(in_str, split_size = 50):
+ """Split a hex string into two parts separated by an
ellipsis"""
+ if len(in_str) <= split_size:
+ return Utils._hex_str(in_str, 0, len(in_str))
+# if len(in_str) > split_size + 25:
+# return Utils._hex_str(in_str, 0, 10) + " ... " +
Utils._hex_str(in_str, 55, 65) + " ... " + \
+# Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+ return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str,
len(in_str)-10, len(in_str))
+ _hex_split_str = staticmethod(_hex_split_str)
+
+ #@staticmethod
+ def _hex_str(in_str, begin, end):
+ """Return a binary string as a hex string"""
+ hstr = ""
+ for index in range(begin, end):
+ if Utils._is_printable(in_str[index]):
+ hstr += in_str[index]
+ else:
+ hstr += "\\%02x" % ord(in_str[index])
+ return hstr
+ _hex_str = staticmethod(_hex_str)
+
+ #@staticmethod
+ def _is_printable(in_str):
+ """Return True if in_str in printable; False
otherwise."""
+ return in_str.strip(Utils.__printchars) == ""
+ _is_printable = staticmethod(_is_printable)
+
+ #@staticmethod
+ def _load_args(fhandle, klass):
+ """Load the arguments from class klass"""
+ size = calcsize(klass.FORMAT)
+ foffs = fhandle.tell(),
+ fbin = fhandle.read(size)
+ if len(fbin) != size:
+ raise jerr.UnexpectedEndOfFileError(size, len(fbin))
+ return foffs + unpack(klass.FORMAT, fbin)
+ _load_args = staticmethod(_load_args)
+
+ #@staticmethod
+ def _split_str(in_str, split_size = 50):
+ """Split a string into two parts separated by an ellipsis if it is
longer than split_size"""
+ if len(in_str) < split_size:
+ return in_str
+ return in_str[:25] + " ... " + in_str[-25:]
+ _split_str = staticmethod(_split_str)
+
+
+#== class Hdr =================================================================
+
+class Hdr:
+ """Class representing the journal header records"""
+
+ FORMAT = "=4sBBHQ"
+ HDR_VER = 1
+ OWI_MASK = 0x01
+ BIG_ENDIAN = sys.byteorder == "big"
+ REC_BOUNDARY = DBLK_SIZE
+
+ def __init__(self, foffs, magic, ver, endn, flags, rid):
+ """Constructor"""
+# Sizeable.__init__(self)
+ self.foffs = foffs
+ self.magic = magic
+ self.ver = ver
+ self.endn = endn
+ self.flags = flags
+ self.rid = long(rid)
+
+ def __str__(self):
+ """Return string representation of this header"""
+ if self.empty():
+ return "0x%08x: <empty>" % (self.foffs)
+ if self.magic[-1] == "x":
+ return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
+ if self.magic[-1] in ["a", "c", "d", "e",
"f", "x"]:
+ return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" %
(self.foffs, self.magic, self.ver, self.endn,
+ self.flags,
self.rid)
+ return "0x%08x: <error, unknown magic \"%s\" (possible
overwrite boundary?)>" % (self.foffs, self.magic)
+
+ #@staticmethod
+ def discriminate(args):
+ """Use the last char in the header magic to determine the header
type"""
+ return _CLASSES.get(args[1][-1], Hdr)
+ discriminate = staticmethod(discriminate)
+
+ def empty(self):
+ """Return True if this record is empty (ie has a magic of
0x0000"""
+ return self.magic == "\x00"*4
+
+ def encode(self):
+ """Encode the header into a binary string"""
+ return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
+
+ def owi(self):
+ """Return the OWI (overwrite indicator) for this
header"""
+ return self.flags & self.OWI_MASK != 0
+
+ def skip(self, fhandle):
+ """Read and discard the remainder of this
record"""
+ fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
+
+ def check(self):
+ """Check that this record is valid"""
+ if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in
["a", "c", "d", "e", "f",
"x"]:
+ return True
+ if self.magic[-1] != "x":
+ if self.ver != self.HDR_VER:
+ raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
+ if bool(self.endn) != self.BIG_ENDIAN:
+ raise jerr.EndianMismatchError(self.BIG_ENDIAN)
+ return False
+
+
+#== class FileHdr =============================================================
+
+class FileHdr(Hdr):
+ """Class for file headers, found at the beginning of journal
files"""
+
+ FORMAT = "=2H4x3Q"
+ REC_BOUNDARY = SBLK_SIZE
+
+ def __str__(self):
+ """Return a string representation of the this FileHdr
instance"""
+ return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self),
self.fid, self.lid, self.fro,
+ self.timestamp_str())
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro,
self.time_sec, self.time_ns)
+
+ def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
+ """Initialize this instance to known values"""
+ self.fid = fid
+ self.lid = lid
+ self.fro = fro
+ self.time_sec = time_sec
+ self.time_ns = time_ns
+
+ def timestamp(self):
+ """Get the timestamp of this record as a tuple (secs,
nsecs)"""
+ return (self.time_sec, self.time_ns)
+
+ def timestamp_str(self):
+ """Get the timestamp of this record in string
format"""
+ time = gmtime(self.time_sec)
+ fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
+ return strftime(fstr, time)
+
+
+#== class DeqRec ==============================================================
+
+class DeqRec(Hdr):
+ """Class for a dequeue record"""
+
+ FORMAT = "=QQ"
+
+ def __str__(self):
+ """Return a string representation of the this DeqRec
instance"""
+ return "%s %sdrid=0x%x" % (Hdr.__str__(self),
Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
+
+ def init(self, fhandle, foffs, deq_rid, xidsize):
+ """Initialize this instance to known values"""
+ self.deq_rid = deq_rid
+ self.xidsize = xidsize
+ self.xid = None
+ self.deq_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
+ if self.xidsize > 0:
+ fmt = "%ds" % (self.xidsize)
+ buf += pack(fmt, self.xid)
+ buf += self.deq_tail.encode()
+ return buf
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been
loaded"""
+ if self.xidsize == 0:
+ self.xid_complete = True
+ self.tail_complete = True
+ else:
+ if not self.xid_complete:
+ (self.xid, self.xid_complete) = Utils.load_file_data(fhandle,
self.xidsize, self.xid)
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT),
self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
+ magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.deq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False
otherwise"""
+ return self.xid_complete and self.tail_complete
+
+
+#== class TxnRec ==============================================================
+
+class TxnRec(Hdr):
+ """Class for a transaction commit/abort record"""
+
+ FORMAT = "=Q"
+
+ def __str__(self):
+ """Return a string representation of the this TxnRec
instance"""
+ return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid,
self.xidsize))
+
+ def init(self, fhandle, foffs, xidsize):
+ """Initialize this instance to known values"""
+ self.xidsize = xidsize
+ self.xid = None
+ self.tx_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) +
pack("%ds" % self.xidsize, self.xid) + \
+ self.tx_tail.encode()
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been
loaded"""
+ if not self.xid_complete:
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
+ magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.tx_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False
otherwise"""
+ return self.xid_complete and self.tail_complete
+
+
+#== class EnqRec ==============================================================
+
+class EnqRec(Hdr):
+ """Class for a enqueue record"""
+
+ FORMAT = "=QQ"
+ TRANSIENT_MASK = 0x10
+ EXTERN_MASK = 0x20
+
+ def __str__(self):
+ """Return a string representation of the this EnqRec
instance"""
+ return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid,
self.xidsize),
+ Utils.format_data(self.dsize, self.data),
self.enq_tail, self.print_flags())
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
+ if self.xidsize > 0:
+ buf += pack("%ds" % self.xidsize, self.xid)
+ if self.dsize > 0:
+ buf += pack("%ds" % self.dsize, self.data)
+ if self.xidsize > 0 or self.dsize > 0:
+ buf += self.enq_tail.encode()
+ return buf
+
+ def init(self, fhandle, foffs, xidsize, dsize):
+ """Initialize this instance to known values"""
+ self.xidsize = xidsize
+ self.dsize = dsize
+ self.transient = self.flags & self.TRANSIENT_MASK > 0
+ self.extern = self.flags & self.EXTERN_MASK > 0
+ self.xid = None
+ self.data = None
+ self.enq_tail = None
+ self.xid_complete = False
+ self.data_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been
loaded"""
+ if not self.xid_complete:
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.data_complete:
+ if self.extern:
+ self.data_complete = True
+ else:
+ ret = Utils.load_file_data(fhandle, self.dsize, self.data)
+ self.data = ret[0]
+ self.data_complete = ret[1]
+ if self.data_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT,
self.tail_bin))
+ magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.enq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False
otherwise"""
+ return self.xid_complete and self.data_complete and self.tail_complete
+
+ def print_flags(self):
+ """Utility function to decode the flags field in the header and
print a string representation"""
+ fstr = ""
+ if self.transient:
+ fstr = "*TRANSIENT"
+ if self.extern:
+ if len(fstr) > 0:
+ fstr += ",EXTERNAL"
+ else:
+ fstr = "*EXTERNAL"
+ if len(fstr) > 0:
+ fstr += "*"
+ return fstr
+
+
+#== class RecTail =============================================================
+
+class RecTail:
+ """Class for a record tail - for all records where either an XID or
data separate the header from the end of the
+ record"""
+
+ FORMAT = "=4sQ"
+
+ def __init__(self, foffs, magic_inv, rid):
+ """Initialize this instance to known values"""
+ self.foffs = foffs
+ self.magic_inv = magic_inv
+ self.rid = long(rid)
+
+ def __str__(self):
+ """Return a string representation of the this RecTail
instance"""
+ magic = Utils.inv_str(self.magic_inv)
+ return "[\"%s\" rid=0x%x]" % (magic, self.rid)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return pack(RecTail.FORMAT, self.magic_inv, self.rid)
+
+
+#== class JrnlInfo ============================================================
+
+class JrnlInfo(object):
+ """
+ This object reads and writes journal information files (<basename>.jinf).
Methods are provided
+ to read a file, query its properties and reset just those properties necessary for
normalizing
+ and resizing a journal.
+
+ Normalizing: resetting the directory and/or base filename to different values. This
is necessary
+ if a set of journal files is copied from one location to another before being
restored, as the
+ value of the path in the file no longer matches the actual path.
+
+ Resizing: If the journal geometry parameters (size and number of journal files)
changes, then the
+ .jinf file must reflect these changes, as this file is the source of information for
journal
+ recovery.
+
+ NOTE: Data size vs File size: There are methods which return the data size and file
size of the
+ journal files.
+
+ +-------------+--------------------/ /----------+
+ | File header | File data |
+ +-------------+--------------------/ /----------+
+ | | |
+ | |<---------- Data size ---------->|
+ |<------------------ File Size ---------------->|
+
+ Data size: The size of the data content of the journal, ie that part which stores the
data records.
+
+ File size: The actual disk size of the journal including data and the file header
which precedes the
+ data.
+
+ The file header is fixed to 1 sblk, so file size = jrnl size + sblk size.
+ """
+
+ def __init__(self, jdir, bfn = "JournalData"):
+ """Constructor"""
+ self.__jdir = jdir
+ self.__bfn = bfn
+ self.__jinf_dict = {}
+ self._read_jinf()
+
+ def __str__(self):
+ """Create a string containing all of the journal info contained in
the jinf file"""
+ ostr = "Journal info file %s:\n" % os.path.join(self.__jdir,
"%s.jinf" % self.__bfn)
+ for key, val in self.__jinf_dict.iteritems():
+ ostr += " %s = %s\n" % (key, val)
+ return ostr
+
+ def normalize(self, jdir = None, bfn = None):
+ """Normalize the directory (ie reset the directory path to match
the actual current location) for this
+ jinf file"""
+ if jdir == None:
+ self.__jinf_dict["directory"] = self.__jdir
+ else:
+ self.__jdir = jdir
+ self.__jinf_dict["directory"] = jdir
+ if bfn != None:
+ self.__bfn = bfn
+ self.__jinf_dict["base_filename"] = bfn
+
+ def resize(self, num_jrnl_files = None, jrnl_file_size = None):
+ """Reset the journal size information to allow for resizing the
journal"""
+ if num_jrnl_files != None:
+ self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
+ if jrnl_file_size != None:
+ self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size *
self.get_jrnl_dblk_size_bytes()
+
+ def write(self, jdir = None, bfn = None):
+ """Write the .jinf file"""
+ self.normalize(jdir, bfn)
+ if not os.path.exists(self.get_jrnl_dir()):
+ os.makedirs(self.get_jrnl_dir())
+ fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" %
self.get_jrnl_base_name()), "w")
+ fhandle.write("<?xml version=\"1.0\" ?>\n")
+ fhandle.write("<jrnl>\n")
+ fhandle.write(" <journal_version value=\"%d\" />\n" %
self.get_jrnl_version())
+ fhandle.write(" <journal_id>\n")
+ fhandle.write(" <id_string value=\"%s\" />\n" %
self.get_jrnl_id())
+ fhandle.write(" <directory value=\"%s\" />\n" %
self.get_jrnl_dir())
+ fhandle.write(" <base_filename value=\"%s\" />\n" %
self.get_jrnl_base_name())
+ fhandle.write(" </journal_id>\n")
+ fhandle.write(" <creation_time>\n")
+ fhandle.write(" <seconds value=\"%d\" />\n" %
self.get_creation_time()[0])
+ fhandle.write(" <nanoseconds value=\"%d\" />\n" %
self.get_creation_time()[1])
+ fhandle.write(" <string value=\"%s\" />\n" %
self.get_creation_time_str())
+ fhandle.write(" </creation_time>\n")
+ fhandle.write(" <journal_file_geometry>\n")
+ fhandle.write(" <number_jrnl_files value=\"%d\"
/>\n" % self.get_num_jrnl_files())
+ fhandle.write(" <auto_expand value=\"%s\" />\n" %
str.lower(str(self.get_auto_expand())))
+ fhandle.write(" <jrnl_file_size_sblks value=\"%d\"
/>\n" % self.get_jrnl_data_size_sblks())
+ fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" %
self.get_jrnl_sblk_size_dblks())
+ fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" %
self.get_jrnl_dblk_size_bytes())
+ fhandle.write(" </journal_file_geometry>\n")
+ fhandle.write(" <cache_geometry>\n")
+ fhandle.write(" <wcache_pgsize_sblks value=\"%d\"
/>\n" % self.get_wr_buf_pg_size_sblks())
+ fhandle.write(" <wcache_num_pages value=\"%d\" />\n"
% self.get_num_wr_buf_pgs())
+ fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\"
/>\n" % self.get_rd_buf_pg_size_sblks())
+ fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n"
% self.get_num_rd_buf_pgs())
+ fhandle.write(" </cache_geometry>\n")
+ fhandle.write("</jrnl>\n")
+ fhandle.close()
+
+ # Journal ID
+
+ def get_jrnl_version(self):
+ """Get the journal version"""
+ return self.__jinf_dict["journal_version"]
+
+ def get_jrnl_id(self):
+ """Get the journal id"""
+ return self.__jinf_dict["id_string"]
+
+ def get_current_dir(self):
+ """Get the current directory of the store (as opposed to that
value saved in the .jinf file)"""
+ return self.__jdir
+
+ def get_jrnl_dir(self):
+ """Get the journal directory stored in the .jinf
file"""
+ return self.__jinf_dict["directory"]
+
+ def get_jrnl_base_name(self):
+ """Get the base filename - that string used to name the journal
files <basefilename>-nnnn.jdat and
+ <basefilename>.jinf"""
+ return self.__jinf_dict["base_filename"]
+
+ # Journal creation time
+
+ def get_creation_time(self):
+ """Get journal creation time as a tuple (secs,
nsecs)"""
+ return (self.__jinf_dict["seconds"],
self.__jinf_dict["nanoseconds"])
+
+ def get_creation_time_str(self):
+ """Get journal creation time as a string"""
+ return self.__jinf_dict["string"]
+
+ # --- Files and geometry ---
+
+ def get_num_jrnl_files(self):
+ """Get number of data files in the journal"""
+ return self.__jinf_dict["number_jrnl_files"]
+
+ def get_auto_expand(self):
+ """Return True if auto-expand is enabled; False
otherwise"""
+ return self.__jinf_dict["auto_expand"]
+
+ def get_jrnl_sblk_size_dblks(self):
+ """Get the journal softblock size in dblks"""
+ return self.__jinf_dict["JRNL_SBLK_SIZE"]
+
+ def get_jrnl_sblk_size_bytes(self):
+ """Get the journal softblock size in bytes"""
+ return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_jrnl_dblk_size_bytes(self):
+ """Get the journal datablock size in bytes"""
+ return self.__jinf_dict["JRNL_DBLK_SIZE"]
+
+ def get_jrnl_data_size_sblks(self):
+ """Get the data capacity (excluding the file headers) for one
journal file in softblocks"""
+ return self.__jinf_dict["jrnl_file_size_sblks"]
+
+ def get_jrnl_data_size_dblks(self):
+ """Get the data capacity (excluding the file headers) for one
journal file in datablocks"""
+ return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_jrnl_data_size_bytes(self):
+ """Get the data capacity (excluding the file headers) for one
journal file in bytes"""
+ return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_jrnl_file_size_sblks(self):
+ """Get the size of one journal file on disk (including the file
headers) in softblocks"""
+ return self.get_jrnl_data_size_sblks() + 1
+
+ def get_jrnl_file_size_dblks(self):
+ """Get the size of one journal file on disk (including the file
headers) in datablocks"""
+ return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_jrnl_file_size_bytes(self):
+ """Get the size of one journal file on disk (including the file
headers) in bytes"""
+ return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_tot_jrnl_data_size_sblks(self):
+ """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
+ softblocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+
+ def get_tot_jrnl_data_size_dblks(self):
+ """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
+ datablocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
+
+ def get_tot_jrnl_data_size_bytes(self):
+ """Get the size of the entire jouranl's data capacity
(excluding the file headers) for all files together in
+ bytes"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+
+ # Read and write buffers
+
+ def get_wr_buf_pg_size_sblks(self):
+ """Get the size of the write buffer pages in
softblocks"""
+ return self.__jinf_dict["wcache_pgsize_sblks"]
+
+ def get_wr_buf_pg_size_dblks(self):
+ """Get the size of the write buffer pages in
datablocks"""
+ return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_wr_buf_pg_size_bytes(self):
+ """Get the size of the write buffer pages in
bytes"""
+ return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_num_wr_buf_pgs(self):
+ """Get the number of write buffer pages"""
+ return self.__jinf_dict["wcache_num_pages"]
+
+ def get_rd_buf_pg_size_sblks(self):
+ """Get the size of the read buffer pages in
softblocks"""
+ return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
+
+ def get_rd_buf_pg_size_dblks(self):
+ """Get the size of the read buffer pages in
datablocks"""
+ return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
+
+ def get_rd_buf_pg_size_bytes(self):
+ """Get the size of the read buffer pages in
bytes"""
+ return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
+
+ def get_num_rd_buf_pgs(self):
+ """Get the number of read buffer pages"""
+ return self.__jinf_dict["JRNL_RMGR_PAGES"]
+
+ def _read_jinf(self):
+ """Read and initialize this instance from an existing jinf file
located at the directory named in the
+ constructor - called by the constructor"""
+ fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn),
"r")
+ parser = xml.parsers.expat.ParserCreate()
+ parser.StartElementHandler = self._handle_xml_start_elt
+ parser.CharacterDataHandler = self._handle_xml_char_data
+ parser.EndElementHandler = self._handle_xml_end_elt
+ parser.ParseFile(fhandle)
+ fhandle.close()
+
+ def _handle_xml_start_elt(self, name, attrs):
+ """Callback for handling XML start elements. Used by the XML
parser."""
+ # bool values
+ if name == "auto_expand":
+ self.__jinf_dict[name] = attrs["value"] == "true"
+ # long values
+ elif name == "seconds" or \
+ name == "nanoseconds":
+ self.__jinf_dict[name] = long(attrs["value"])
+ # int values
+ elif name == "journal_version" or \
+ name == "number_jrnl_files" or \
+ name == "jrnl_file_size_sblks" or \
+ name == "JRNL_SBLK_SIZE" or \
+ name == "JRNL_DBLK_SIZE" or \
+ name == "wcache_pgsize_sblks" or \
+ name == "wcache_num_pages" or \
+ name == "JRNL_RMGR_PAGE_SIZE" or \
+ name == "JRNL_RMGR_PAGES":
+ self.__jinf_dict[name] = int(attrs["value"])
+ # strings
+ elif "value" in attrs:
+ self.__jinf_dict[name] = attrs["value"]
+
+ def _handle_xml_char_data(self, data):
+ """Callback for handling character data (ie within
<elt>...</elt>). The jinf file does not use this in its
+ data. Used by the XML parser."""
+ pass
+
+ def _handle_xml_end_elt(self, name):
+ """Callback for handling XML end elements. Used by XML
parser."""
+ pass
+
+
+#==============================================================================
+
+_CLASSES = {
+ "a": TxnRec,
+ "c": TxnRec,
+ "d": DeqRec,
+ "e": EnqRec,
+ "f": FileHdr
+}
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."