Author: kpvdr
Date: 2009-11-25 15:45:30 -0500 (Wed, 25 Nov 2009)
New Revision: 3726
Added:
store/trunk/cpp/tools/
store/trunk/cpp/tools/jrnl.py
store/trunk/cpp/tools/resize
store/trunk/cpp/tools/store_chk
Modified:
store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Added python journal tools for inspection and resizing journal files
Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2009-11-25 17:57:03 UTC (rev 3725)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2009-11-25 20:45:30 UTC (rev 3726)
@@ -113,14 +113,15 @@
throw std::runtime_error(oss.str());
}
std::ostringstream oss;
- oss << ja << " -q -d " << jpp->jdir()
<< " -b " << jpp->base_filename();
+ oss << ja << " -b " <<
jpp->base_filename();
// TODO: When jfile_check.py can handle previously recovered journals
for
// specific tests, then remove this exclusion.
if (!_args.recover_mode)
{
- oss << " -c" <<
_args.test_case_csv_file_name;
- oss << " -t" <<
(*tci)->test_case_num();
+ oss << " -c " <<
_args.test_case_csv_file_name;
+ oss << " -t " <<
(*tci)->test_case_num();
}
+ oss << " -q " << jpp->jdir();
bool res = system(oss.str().c_str()) != 0;
(*tci)->set_fmt_chk_res(res, jpp->jid());
if (res) _err_flag = true;
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2009-11-25 17:57:03 UTC (rev 3725)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2009-11-25 20:45:30 UTC (rev 3726)
@@ -27,19 +27,19 @@
# Run jtt using default test set
echo
echo "===== Mode 1: New journal instance, no recover ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk
--num-jrnls ${num_jrnls} || fail=1
rm -rf ${TMP_DATA_DIR}/test_0*
echo
echo "===== Mode 2: Re-use journal instance, no recover ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--reuse-instance --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--reuse-instance --format-chk --num-jrnls ${num_jrnls} || fail=1
rm -rf ${TMP_DATA_DIR}/test_0*
echo
echo "===== Mode 3: New journal instance, recover previous test journal ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
rm -rf ${TMP_DATA_DIR}/test_0*
echo
echo "===== Mode 4: Re-use journal instance, recover previous test journal
====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--reuse-instance --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv
--reuse-instance --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
rm -rf ${TMP_DATA_DIR}/test_0*
echo
Added: store/trunk/cpp/tools/jrnl.py
===================================================================
--- store/trunk/cpp/tools/jrnl.py (rev 0)
+++ store/trunk/cpp/tools/jrnl.py 2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,1139 @@
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import os.path, sys, xml.parsers.expat
+from struct import pack, unpack, calcsize
+from time import gmtime, strftime
+
+# TODO: Get rid of these! Use jinf instance instead
+dblkSize = 128
+sblkSize = 4 * dblkSize
+
+#== protected and private
======================================================================
+
+_extern_mask = 0x20
+
+class Utils(object):
+
+ __printchars =
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~
"
+
+ @staticmethod
+ def formatData(dsize, data):
+ if data == None:
+ return ""
+ if Utils.__isPrintable(data):
+ datastr = Utils.__splitStr(data)
+ else:
+ datastr = Utils.__hexSplitStr(data)
+ if dsize != len(data):
+ raise Exception("Inconsistent data size: dsize=%d,
data(%d)=\"%s\"" % (dsize, len(data), datastr))
+ return "data(%d)=\"%s\" " % (dsize, datastr)
+
+ @staticmethod
+ def formatXid(xid, xidsize = None, uuidFormat = False):
+ if xid == None and xidsize != None:
+ if xidsize > 0: raise Exception("Inconsistent XID size: xidsize=%d,
xid=None" % xidsize)
+ return ""
+ if Utils.__isPrintable(xid):
+ xidstr = Utils.__splitStr(xid)
+ else:
+ xidstr = Utils.__hexSplitStr(xid)
+ if xidsize == None:
+ xidsize = len(xid)
+ elif xidsize != len(xid):
+ raise Exception("Inconsistent XID size: xidsize=%d,
xid(%d)=\"%s\"" % (xidsize, len(xid), xidstr))
+ return "xid(%d)=\"%s\" " % (xidsize, xidstr)
+
+ @staticmethod
+ def invStr(s):
+ si = ""
+ for i in range(0,len(s)):
+ si += chr(~ord(s[i]) & 0xff)
+ return si
+
+ @staticmethod
+ def load(f, klass):
+ args = Utils.__loadArgs(f, klass)
+ subclass = klass.discriminate(args)
+ result = subclass(*args) # create instance of record
+ if subclass != klass:
+ result.init(f, *Utils.__loadArgs(f, subclass))
+ result.skip(f)
+ return result;
+
+ @staticmethod
+ def loadFileData(f, size, data):
+ if size == 0:
+ return (data, True)
+ if data == None:
+ loaded = 0
+ else:
+ loaded = len(data)
+ foverflow = f.tell() + size - loaded > jfsize
+ if foverflow:
+ rsize = jfsize - f.tell()
+ else:
+ rsize = size - loaded
+ bin = f.read(rsize)
+ if data == None:
+ data = unpack("%ds" % (rsize), bin)[0]
+ else:
+ data = data + unpack("%ds" % (rsize), bin)[0]
+ return (data, not foverflow)
+
+ @staticmethod
+ def remBytesInBlk(f, blkSize):
+ foffs = f.tell()
+ return Utils.sizeInBytesToBlk(foffs, blkSize) - foffs;
+
+ @staticmethod
+ def sizeInBlks(size, blkSize):
+ return int((size + blkSize - 1) / blkSize)
+
+ @staticmethod
+ def sizeInBytesToBlk(size, blkSize):
+ return Utils.sizeInBlks(size, blkSize) * blkSize
+
+ @staticmethod
+ def __hexSplitStr(s, splitSize = 50):
+ if len(s) <= splitSize:
+ return Utils.__hexStr(s, 0, len(s))
+# if len(s) > splitSize + 25:
+# return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s, 55,
65) + " ... " + Utils.__hexStr(s, len(s)-10, len(s))
+ return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s,
len(s)-10, len(s))
+
+ @staticmethod
+ def __hexStr(s, b, e):
+ o = ""
+ for i in range(b, e):
+ if Utils.__isPrintable(s[i]):
+ o += s[i]
+ else:
+ o += "\\%02x" % ord(s[i])
+ return o
+
+ @staticmethod
+ def __isPrintable(s):
+ return s.strip(Utils.__printchars) == ""
+
+ @staticmethod
+ def __loadArgs(f, klass):
+ size = calcsize(klass.format)
+ foffs = f.tell(),
+ bin = f.read(size)
+ if len(bin) != size: raise Exception("End of file")
+ return foffs + unpack(klass.format, bin)
+
+ @staticmethod
+ def __splitStr(s, splitSize = 50):
+ if len(s) < splitSize:
+ return s
+ return s[:25] + " ... " + s[-25:]
+
+
+#== class Warning =============================================================
+
+class Warning(Exception):
+ def __init__(self, err):
+ Exception.__init__(self, err)
+
+
+#== class Sizeable ============================================================
+
+class Sizeable(object):
+
+ def size(self):
+ classes = [self.__class__]
+ size = 0
+ while classes:
+ cls = classes.pop()
+ if hasattr(cls, "format"):
+ size += calcsize(cls.format)
+ classes.extend(cls.__bases__)
+ return size
+
+
+#== class Hdr =================================================================
+
+class Hdr(Sizeable):
+
+ format = "=4sBBHQ"
+ hdrVer = 1
+ owi_mask = 0x01
+ big_endian_flag = sys.byteorder == "big"
+
+ def __init__(self, foffs, magic, ver, endn, flags, rid):
+ self.foffs = foffs
+ self.magic = magic
+ self.ver = ver
+ self.endn = endn
+ self.flags = flags
+ self.rid = long(rid)
+
+ def __str__(self):
+ if self.empty():
+ return "0x%08x: <empty>" % (self.foffs)
+ if self.magic[-1] == "x":
+ return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
+ if self.magic[-1] in ["a", "c", "d", "e",
"f", "x"]:
+ return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" %
(self.foffs, self.magic, self.ver, self.endn, self.flags, self.rid)
+ return "0x%08x: <error, unknown magic \"%s\" (possible
overwrite boundary?)>" % (self.foffs, self.magic)
+
+ @staticmethod
+ def discriminate(args):
+ return _CLASSES.get(args[1][-1], Hdr)
+ #discriminate = staticmethod(discriminate)
+
+ def empty(self):
+ return self.magic == "\x00"*4
+
+ def encode(self):
+ return pack(Hdr.format, self.magic, self.ver, self.endn, self.flags, self.rid)
+
+ def owi(self):
+ return self.flags & self.owi_mask != 0
+
+ def skip(self, f):
+ f.read(Utils.remBytesInBlk(f, dblkSize))
+
+ def check(self):
+ if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in
["a", "c", "d", "e", "f",
"x"]:
+ return True
+ if self.magic[-1] != "x":
+ if self.ver != self.hdrVer:
+ raise Exception("%s: Invalid header version: found %d, expected
%d." % (self, self.ver, self.hdrVer))
+ if bool(self.endn) != self.big_endian_flag:
+ if self.big_endian_flag: e = "big"
+ else: e = "little"
+ raise Exception("Endian mismatch: this platform is %s and does not
match record encoding (0x%04x)" % (e, self.endn))
+ return False
+
+
+#== class FileHdr =============================================================
+
+class FileHdr(Hdr):
+
+ format = "=2H4x3Q"
+
+ def __str__(self):
+ return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self),
self.fid, self.lid, self.fro, self.timestamp_str())
+
+ def encode(self):
+ return Hdr.encode(self) + pack(FileHdr.format, self.fid, self.lid, self.fro,
self.time_sec, self.time_ns)
+
+ def init(self, f, foffs, fid, lid, fro, time_sec, time_ns):
+ self.fid = fid
+ self.lid = lid
+ self.fro = fro
+ self.time_sec = time_sec
+ self.time_ns = time_ns
+
+ def skip(self, f):
+ f.read(Utils.remBytesInBlk(f, sblkSize))
+
+ def timestamp(self):
+ return (self.time_sec, self.time_ns)
+
+ def timestamp_str(self):
+ ts = gmtime(self.time_sec)
+ fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
+ return strftime(fstr, ts)
+
+
+#== class DeqRec ==============================================================
+
+class DeqRec(Hdr):
+
+ format = "=QQ"
+
+ def __str__(self):
+ return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.formatXid(self.xid,
self.xidsize), self.deq_rid)
+
+ def init(self, f, foffs, deq_rid, xidsize):
+ self.deq_rid = deq_rid
+ self.xidsize = xidsize
+ self.xid = None
+ self.deq_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(f)
+
+ def encode(self):
+ d = Hdr.encode(self) + pack(DeqRec.format, self.deq_rid, self.xidsize)
+ if self.xidsize > 0:
+ fmt = "%ds" % (self.xidsize)
+ d += pack(fmt, self.xid)
+ d += self.deq_tail.encode()
+ return d
+
+ def load(self, f):
+ if self.xidsize == 0:
+ self.xid_complete = True
+ self.tail_complete = True
+ else:
+ if not self.xid_complete:
+ (self.xid, self.xid_complete) = Utils.loadFileData(f, self.xidsize,
self.xid)
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.format,
self.tail_bin))
+ if self.deq_tail.magic_inv != Utils.invStr(self.magic) or
self.deq_tail.rid != self.rid:
+ raise Exception(" > %s *INVALID TAIL RECORD*" %
self)
+ self.deq_tail.skip(f)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ return self.xid_complete and self.tail_complete
+
+
+#== class TxnRec ==============================================================
+
+class TxnRec(Hdr):
+
+ format = "=Q"
+
+ def __str__(self):
+ return "%s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid,
self.xidsize))
+
+ def init(self, f, foffs, xidsize):
+ self.xidsize = xidsize
+ self.xid = None
+ self.tx_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(f)
+
+ def encode(self):
+ return Hdr.encode(self) + pack(TxnRec.format, self.xidsize) +
pack("%ds" % self.xidsize, self.xid) + self.tx_tail.encode()
+
+ def load(self, f):
+ if not self.xid_complete:
+ ret = Utils.loadFileData(f, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.format,
self.tail_bin))
+ if self.tx_tail.magic_inv != Utils.invStr(self.magic) or self.tx_tail.rid
!= self.rid:
+ raise Exception(" > %s *INVALID TAIL RECORD*" % self)
+ self.tx_tail.skip(f)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ return self.xid_complete and self.tail_complete
+
+
+#== class EnqRec ==============================================================
+
+class EnqRec(Hdr):
+
+ format = "=QQ"
+ transient_mask = 0x10
+ extern_mask = 0x20
+
+ def __str__(self):
+ return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid,
self.xidsize), Utils.formatData(self.dsize, self.data), self.enq_tail,
self.print_flags())
+
+ def encode(self):
+ d = Hdr.encode(self) + pack(EnqRec.format, self.xidsize, self.dsize)
+ if self.xidsize > 0:
+ d += pack("%ds" % self.xidsize, self.xid)
+ if self.dsize > 0:
+ d += pack("%ds" % self.dsize, self.data)
+ if self.xidsize > 0 or self.dsize > 0:
+ d += self.enq_tail.encode()
+ return d
+
+ def init(self, f, foffs, xidsize, dsize):
+ self.xidsize = xidsize
+ self.dsize = dsize
+ self.transient = self.flags & self.transient_mask > 0
+ self.extern = self.flags & self.extern_mask > 0
+ self.xid = None
+ self.data = None
+ self.enq_tail = None
+ self.xid_complete = False
+ self.data_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(f)
+
+ def load(self, f):
+ if not self.xid_complete:
+ ret = Utils.loadFileData(f, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.data_complete:
+ if self.extern:
+ self.data_complete = True
+ else:
+ ret = Utils.loadFileData(f, self.dsize, self.data)
+ self.data = ret[0]
+ self.data_complete = ret[1]
+ if self.data_complete and not self.tail_complete:
+ ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format,
self.tail_bin))
+ if self.enq_tail.magic_inv != Utils.invStr(self.magic) or
self.enq_tail.rid != self.rid:
+ raise Exception(" > %s *INVALID TAIL RECORD*" % self)
+ self.enq_tail.skip(f)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ return self.xid_complete and self.data_complete and self.tail_complete
+
+ def print_flags(self):
+ s = ""
+ if self.transient:
+ s = "*TRANSIENT"
+ if self.extern:
+ if len(s) > 0:
+ s += ",EXTERNAL"
+ else:
+ s = "*EXTERNAL"
+ if len(s) > 0:
+ s += "*"
+ return s
+
+
+#== class RecTail =============================================================
+
+class RecTail(Sizeable):
+
+ format = "=4sQ"
+
+ def __init__(self, foffs, magic_inv, rid):
+ self.foffs = foffs
+ self.magic_inv = magic_inv
+ self.rid = long(rid)
+
+ def __str__(self):
+ magic = Utils.invStr(self.magic_inv)
+ return "[\"%s\" rid=0x%x]" % (magic, self.rid)
+
+ def encode(self):
+ return pack(RecTail.format, self.magic_inv, self.rid)
+
+ def skip(self, f):
+ f.read(Utils.remBytesInBlk(f, dblkSize))
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+
+ def __init__(self):
+ self.__map = {}
+
+ def __str__(self):
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ if hdr.rid in self.__map.keys(): raise Exception("ERROR: Duplicate rid to
EnqMap: rid=0x%x" % hdr.rid)
+ self.__map[hdr.rid] = (fid, hdr, False)
+
+ def contains(self, rid):
+ return rid in self.__map.keys()
+
+ def delete(self, rid):
+ if rid in self.__map.keys():
+ if self.getLock(rid):
+ raise Exception("ERROR: Deleting locked record from EnqMap:
rid=0x%s" % rid)
+ del self.__map[rid]
+ else:
+ raise Warning("ERROR: Deleting non-existent rid from EnqMap:
rid=0x%x" % rid)
+
+ def get(self, rid):
+ if self.contains(rid): return self.__map[rid]
+ return None
+
+ def getFid(self, rid):
+ if self.contains(rid): return self.__map[rid][0]
+ return None
+
+ def getHdr(self, rid):
+ if self.contains(rid): return self.__map[rid][1]
+ return None
+
+ def getLock(self, rid):
+ if self.contains(rid): return self.__map[rid][2]
+ return None
+
+ def getRecList(self):
+ return self.__map.values()
+
+ def lock(self, rid):
+ if rid in self.__map.keys():
+ tup = self.__map[rid]
+ self.__map[rid] = (tup[0], tup[1], True)
+ else:
+ raise Warning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x"
% rid)
+
+ def report(self, showStats, showRecords):
+ if len(self.__map) == 0: return "No enqueued records found."
+ s = "%d enqueued records found" % len(self.__map)
+ if showRecords:
+ s += ":"
+ ridList = self.__map.keys()
+ ridList.sort()
+ for rid in ridList:
+# for f,r in self.__map.iteritems():
+ r = self.__map[rid]
+ if r[2]:
+ lockStr = " [LOCKED]"
+ else:
+ lockStr = ""
+ s += "\n lfid=%d %s %s" % (r[0], r[1], lockStr)
+ else:
+ s += "."
+ return s
+
+ def rids(self):
+ return self.__map.keys()
+
+ def size(self):
+ return len(self.__map)
+
+ def unlock(self, rid):
+ if rid in self.__map.keys():
+ tup = self.__map[rid]
+ if tup[2]:
+ self.__map[rid] = (tup[0], tup[1], False)
+ else:
+ raise Exception("ERROR: Unlocking rid which is not locked in EnqMap:
rid=0x%x" % rid)
+ else:
+ raise Exception("ERROR: Unlocking non-existent rid in EnqMap:
rid=0x%x" % rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+
+ def __init__(self, emap):
+ self.__emap = emap
+ self.__map = {}
+
+ def __str__(self):
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ if isinstance(hdr, DeqRec):
+ self.__emap.lock(hdr.deq_rid)
+ if hdr.xid in self.__map.keys():
+ self.__map[hdr.xid].append((fid, hdr)) # append to existing list
+ else:
+ self.__map[hdr.xid] = [(fid, hdr)] # create new list
+
+ def contains(self, xid):
+ return xid in self.__map.keys()
+
+ def delete(self, hdr):
+ if hdr.magic[-1] == "c": return self.__commit(hdr.xid)
+ if hdr.magic[-1] == "a": self.__abort(hdr.xid)
+ else: raise Exception("ERROR: cannot delete from TxnMap using hdr type
%s" % hdr.magic)
+
+ def get(self, xid):
+ if self.contains(xid): return self.__map[xid]
+
+ def report(self, showStats, showRecords):
+ if len(self.__map) == 0: return "No outstanding transactions found."
+ s = "%d outstanding transactions found" % len(self.__map)
+ if showRecords:
+ s += ":"
+ for x,t in self.__map.iteritems():
+ s += "\n xid=%s:" % Utils.formatXid(x)
+ for i in t:
+ s += "\n %s" % str(i[1])
+ else:
+ s += "."
+ return s
+
+ def size(self):
+ return len(self.__map)
+
+ def xids(self):
+ return self.__map.keys()
+
+ def __abort(self, xid):
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, DeqRec):
+ self.__emap.unlock(hdr.rid)
+ del self.__map[xid]
+
+ def __commit(self, xid):
+ mismatchList = []
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, EnqRec):
+ self.__emap.add(fid, hdr) # Transfer enq to emap
+ else:
+ if self.__emap.contains(hdr.deq_rid):
+ self.__emap.unlock(hdr.deq_rid)
+ self.__emap.delete(hdr.deq_rid)
+ else:
+ mismatchList.append("0x%x" % hdr.deq_rid)
+ del self.__map[xid]
+ return mismatchList
+
+
+
+#== class JrnlInfo ============================================================
+
+class JrnlInfo(object):
+ """
+ This object reads and writes journal information files (<basename>.jinf).
Methods are provided
+ to read a file, query its properties and reset just those properties necessary for
normalizing
+ and resizing a journal.
+
+ Normalizing: resetting the directory and/or base filename to different values. This
is necessary
+ if a set of journal files is copied from one location to another before being
restored, as the
+ value of the path in the file no longer matches the actual path.
+
+ Resizing: If the journal geometry parameters (size and number of journal files)
changes, then the
+ .jinf file must reflect these changes, as this file is the source of information for
journal
+ recovery.
+
+ NOTE: Size vs File size: There are methods which return the journal size and file
size of the
+ journal files.
+
+ +-------------+--------------------/ /----------+
+ | File header | File data |
+ +-------------+--------------------/ /----------+
+ | | |
+ | |<------------- Size ------------>|
+ |<------------------ FileSize ----------------->|
+
+ Size: The size of the data content of the journal, ie that part which stores the data
records.
+
+ File size: The actual disk size of the journal including data and the file header
which precedes the
+ data.
+
+ The file header is fixed to 1 sblk, so file size = jrnl size + sblk size.
+ """
+
+ def __init__(self, jdir, bfn = "JournalData"):
+ self.__jdir = jdir
+ self.__bfn = bfn
+ self.__jinfDict = {}
+ self.__read_jinf()
+
+ def __str__(self):
+ s = "Journal info file %s:\n" % os.path.join(self.__jdir,
"%s.jinf" % self.__bfn)
+ for k,v in self.__jinfDict.iteritems():
+ s += " %s = %s\n" % (k, v)
+ return s
+
+ def normalize(self, jdir = None, baseFilename = None):
+ if jdir == None:
+ self.__jinfDict["directory"] = self.__jdir
+ else:
+ self.__jdir = jdir
+ self.__jinfDict["directory"] = jdir
+ if baseFilename != None:
+ self.__bfn = baseFilename
+ self.__jinfDict["base_filename"] = baseFilename
+
+ def resize(self, njFiles = None, jfSize = None):
+ if njFiles != None:
+ self.__jinfDict["number_jrnl_files"] = njFiles
+ if jfSize != None:
+ self.__jinfDict["jrnl_file_size_sblks"] = jfSize * dblkSize
+
+ def write(self, jdir = None, baseFilename = None):
+ self.normalize(jdir, baseFilename)
+ if not os.path.exists(self.getJrnlDir()): os.makedirs(self.getJrnlDir())
+ wdir = os.path.join(self.getJrnlDir(), "%s.jinf" %
self.getJrnlBaseName())
+ f = open(os.path.join(self.getJrnlDir(), "%s.jinf" %
self.getJrnlBaseName()), "w")
+ f.write("<?xml version=\"1.0\" ?>\n")
+ f.write("<jrnl>\n")
+ f.write(" <journal_version value=\"%d\" />\n" %
self.getJrnlVersion())
+ f.write(" <journal_id>\n")
+ f.write(" <id_string value=\"%s\" />\n" %
self.getJrnlId())
+ f.write(" <directory value=\"%s\" />\n" %
self.getJrnlDir())
+ f.write(" <base_filename value=\"%s\" />\n" %
self.getJrnlBaseName())
+ f.write(" </journal_id>\n")
+ f.write(" <creation_time>\n")
+ f.write(" <seconds value=\"%d\" />\n" %
self.getCreationTime()[0])
+ f.write(" <nanoseconds value=\"%d\" />\n" %
self.getCreationTime()[1])
+ f.write(" <string value=\"%s\" />\n" %
self.getCreationTimeStr())
+ f.write(" </creation_time>\n")
+ f.write(" <journal_file_geometry>\n")
+ f.write(" <number_jrnl_files value=\"%d\" />\n" %
self.getNumJrnlFiles())
+ f.write(" <auto_expand value=\"%s\" />\n" %
str.lower(str(self.getAutoExpand())))
+ f.write(" <jrnl_file_size_sblks value=\"%d\" />\n" %
self.getJrnlSizeSblks())
+ f.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" %
self.getJrnlSblkSize())
+ f.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" %
self.getJrnlDblkSize())
+ f.write(" </journal_file_geometry>\n")
+ f.write(" <cache_geometry>\n")
+ f.write(" <wcache_pgsize_sblks value=\"%d\" />\n" %
self.getWriteBufferPageSizeSblks())
+ f.write(" <wcache_num_pages value=\"%d\" />\n" %
self.getNumWriteBufferPages())
+ f.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" %
self.getReadBufferPageSizeSblks())
+ f.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" %
self.getNumReadBufferPages())
+ f.write(" </cache_geometry>\n")
+ f.write("</jrnl>\n")
+ f.close()
+
+ # Journal ID
+
+ def getJrnlVersion(self):
+ return self.__jinfDict["journal_version"]
+
+ def getJrnlId(self):
+ return self.__jinfDict["id_string"]
+
+ def getCurrentJnrlDir(self):
+ return self.__jdir
+
+ def getJrnlDir(self):
+ return self.__jinfDict["directory"]
+
+ def getJrnlBaseName(self):
+ return self.__jinfDict["base_filename"]
+
+ # Journal creation time
+
+ def getCreationTime(self):
+ return (self.__jinfDict["seconds"],
self.__jinfDict["nanoseconds"])
+
+ def getCreationTimeStr(self):
+ return self.__jinfDict["string"]
+
+ # Files and geometry
+
+ def getNumJrnlFiles(self):
+ return self.__jinfDict["number_jrnl_files"]
+
+ def getAutoExpand(self):
+ return self.__jinfDict["auto_expand"]
+
+ def getJrnlSblkSize(self):
+ return self.__jinfDict["JRNL_SBLK_SIZE"]
+
+ def getJrnlDblkSize(self):
+ return self.__jinfDict["JRNL_DBLK_SIZE"]
+
+ def getJrnlSizeSblks(self):
+ return self.__jinfDict["jrnl_file_size_sblks"]
+
+ def getJrnlSizeDblks(self):
+ return self.getJrnlSizeSblks() * self.getJrnlSblkSize()
+
+ def getJrnlSizeBytes(self):
+ return self.getJrnlSizeDblks() * self.getJrnlDblkSize()
+
+ def getJrnlFileSizeSblks(self):
+ return self.getJrnlSizeSblks() + 1
+
+ def getJrnlFileSizeDblks(self):
+ return self.getJrnlFileSizeSblks() * self.getJrnlSblkSize()
+
+ def getJrnlFileSizeBytes(self):
+ return self.getJrnlFileSizeDblks() * self.getJrnlDblkSize()
+
+ def getTotalJrnlFileCapacitySblks(self):
+ return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
+
+ def getTotalJrnlFileCapacityDblks(self):
+ return self.getNumJrnlFiles() * self.getJrnlSizeDblks()
+
+ def getTotalJrnlFileCapacityBytes(self):
+ return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
+
+ # Read and write buffers
+
+ def getWriteBufferPageSizeSblks(self):
+ return self.__jinfDict["wcache_pgsize_sblks"]
+
+ def getWriteBufferPageSizeDblks(self):
+ return self.getWriteBufferPageSizeSblks() * self.getJrnlSblkSize()
+
+ def getWriteBufferPageSizeBytes(self):
+ return self.getWriteBufferPageSizeDblks() * self.getJrnlDblkSize()
+
+ def getNumWriteBufferPages(self):
+ return self.__jinfDict["wcache_num_pages"]
+
+ def getReadBufferPageSizeSblks(self):
+ return self.__jinfDict["JRNL_RMGR_PAGE_SIZE"]
+
+ def getReadBufferPageSizeDblks(self):
+ return self.getReadBufferPageSizeSblks * self.getJrnlSblkSize()
+
+ def getReadBufferPageSizeBytes(self):
+ return self.getReadBufferPageSizeDblks * self.getJrnlDblkSize()
+
+ def getNumReadBufferPages(self):
+ return self.__jinfDict["JRNL_RMGR_PAGES"]
+
+ def __read_jinf(self):
+ f = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn),
"r")
+ p = xml.parsers.expat.ParserCreate()
+ p.StartElementHandler = self.__handleXmlStartElement
+ p.CharacterDataHandler = self.__handleXmlCharData
+ p.EndElementHandler = self.__handleXmlEndElement
+ p.ParseFile(f)
+ f.close()
+
+ def __handleXmlStartElement(self, name, attrs):
+ # bool values
+ if name == "auto_expand":
+ self.__jinfDict[name] = attrs["value"] == "true"
+ # long values
+ elif name == "seconds" or \
+ name == "nanoseconds":
+ self.__jinfDict[name] = long(attrs["value"])
+ # int values
+ elif name == "journal_version" or \
+ name == "number_jrnl_files" or \
+ name == "jrnl_file_size_sblks" or \
+ name == "JRNL_SBLK_SIZE" or \
+ name == "JRNL_DBLK_SIZE" or \
+ name == "wcache_pgsize_sblks" or \
+ name == "wcache_num_pages" or \
+ name == "JRNL_RMGR_PAGE_SIZE" or \
+ name == "JRNL_RMGR_PAGES":
+ self.__jinfDict[name] = int(attrs["value"])
+ # strings
+ elif "value" in attrs:
+ self.__jinfDict[name] = attrs["value"]
+
+ def __handleXmlCharData(self, data): pass
+
+ def __handleXmlEndElement(self, name): pass
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+ """
+ This class analyzes a set of journal files and determines which is the last to be
written
+ (the newest file), and hence which should be the first to be read for recovery (the
oldest
+ file).
+
+ The analysis is performed on construction; the contents of the JrnlInfo object passed
provide
+ the recovery details.
+ """
+
+ def __init__(self, jinf):
+ self.__oldest = None
+ self.__jinf = jinf
+ self.__flist = self._analyze()
+
+ def __str__(self):
+ s = "Journal files analyzed in directory %s (* = earliest full):\n" %
self.__jinf.getCurrentJnrlDir()
+ if self.isEmpty():
+ s += " <All journal files are empty>\n"
+ else:
+ for tup in self.__flist:
+ o = " "
+ if tup[0] == self.__oldest[0]: o = "*"
+ s += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (o,
os.path.basename(tup[1]), tup[2], tup[3], tup[4], tup[5])
+ for i in range(self.__flist[-1][0] + 1, self.__jinf.getNumJrnlFiles()):
+ s += " %s.%04d.jdat: <empty>\n" %
(self.__jinf.getJrnlBaseName(), i)
+ return s
+
+ # Analysis
+
+ def getOldestFile(self):
+ return self.__oldest
+
+ def getOldestFileIndex(self):
+ if self.isEmpty(): return None
+ return self.__oldest[0]
+
+ def isEmpty(self):
+ return len(self.__flist) == 0
+
+ def _analyze(self):
+ fname = ""
+ fnum = -1
+ rid = -1
+ fro = -1
+ tss = ""
+ owi_found = False
+ flist = []
+ for i in range(0, self.__jinf.getNumJrnlFiles()):
+ jfn = os.path.join(self.__jinf.getCurrentJnrlDir(), "%s.%04x.jdat"
% (self.__jinf.getJrnlBaseName(), i))
+ f = open(jfn)
+ fhdr = Utils.load(f, Hdr)
+ if fhdr.empty(): break
+ this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ flist.append(this_tup)
+ if i == 0:
+ init_owi = fhdr.owi()
+ self.__oldest = this_tup
+ elif fhdr.owi() != init_owi and not owi_found:
+ self.__oldest = this_tup
+ owi_found = True
+ return flist
+
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+ """
+ This class contains an Enqueue Map (emap), a transaction map (tmap) and a
transaction
+ object list (txnObjList) which are populated by reading the journals from the oldest
+ to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+ objects supplied on construction provide the information used for the recovery.
+
+ The analysis is performed on construction.
+ """
+
+ def __init__(self, ji, jra, qflag = False, rflag = False, vflag = False):
+ self._ji = ji
+ self._jra = jra
+ self._qflag = qflag
+ self._rflag = rflag
+ self._vflag = vflag
+
+ # test callback functions for CSV tests
+ self._csvStoreChk = None
+ self._csvStartCb = None
+ self._csvEnqCb = None
+ self._csvDeqCb = None
+ self._csvTxnCb = None
+ self._csvEndCb = None
+
+ self._emap = EnqMap()
+ self._tmap = TxnMap(self._emap)
+ self._txnObjList = {}
+
+ self._file = None
+ self._fileHdr = None
+ self._fileNum = None
+ self._firstRecFlag = None
+ self._fro = None
+ self._lastFileFlag = None
+ self._startFileNum = None
+ self._warning = []
+
+ self._abortCnt = 0
+ self._commitCnt = 0
+ self._msgCnt = 0
+ self._recCnt = 0
+ self._txnMsgCnt = 0
+
+ def __str__(self):
+ return self.report(True, self._rflag)
+
+ def abortCnt(self): return self._abortCnt
+
+ def commitCnt(self): return self._commitCnt
+
+ def emap(self): return self._emap
+
+ def msgCnt(self): return self._msgCnt
+
+ def recCnt(self): return self._recCnt
+
+ def report(self, showStats = True, showRecords = False):
+ s = self._emap.report(showStats, showRecords) + "\n" +
self._tmap.report(showStats, showRecords)
+ #TODO - print size analysis here
+ return s
+
+ def run(self):
+ if self._csvStartCb != None and self._csvStartCb(self._csvStoreChk): return
+ if self._jra.isEmpty(): return
+ stop = self._advanceJrnlFile(*self._jra.getOldestFile())
+ while not stop and not self._getNextRecord(): pass
+ if self._csvEndCb != None and self._csvEndCb(self._csvStoreChk): return
+ if not self._qflag: print
+
+ def setCallbacks(self, csvStoreChk, csvStartCb = None, csvEnqCb = None, csvDeqCb =
None, csvTxnCb = None, csvEndCb = None):
+ self._csvStoreChk = csvStoreChk
+ self._csvStartCb = csvStartCb
+ self._csvEnqCb = csvEnqCb
+ self._csvDeqCb = csvDeqCb
+ self._csvTxnCb = csvTxnCb
+ self._csvEndCb = csvEndCb
+
+ def tmap(self): return self._tmap
+
+ def txnMsgCnt(self): return self._txnMsgCnt
+
+ def txnObjList(self): return self._txnObjList
+
+ def _advanceJrnlFile(self, *oldestFileInfo):
+ froSeekFlag = False
+ if len(oldestFileInfo) > 0:
+ self._startFileNum = self._fileNum = oldestFileInfo[0]
+ self._fro = oldestFileInfo[4]
+ froSeekFlag = True # jump to fro to start reading
+ if not self._qflag and not self._rflag:
+ if self._vflag: print "Recovering journals..."
+ else: print "Recovering journals",
+ if self._file != None and self._fileFull():
+ self._file.close()
+ self._fileNum = self._incrFileNum()
+ if self._fileNum == self._startFileNum:
+ return True
+ if self._startFileNum == 0:
+ self._lastFileFlag = self._fileNum == self._ji.getNumJrnlFiles() - 1
+ else:
+ self._lastFileFlag = self._fileNum == self._startFileNum - 1
+ if self._fileNum < 0 or self._fileNum >= self._ji.getNumJrnlFiles():
+ raise Exception("Bad file number %d" % self._fileNum)
+ jfn = os.path.join(self._ji.getCurrentJnrlDir(), "%s.%04x.jdat" %
(self._ji.getJrnlBaseName(), self._fileNum))
+ self._file = open(jfn)
+ self._fileHdr = Utils.load(self._file, Hdr)
+ if froSeekFlag and self._file.tell() != self._fro:
+ self._file.seek(self._fro)
+ self._firstRecFlag = True
+ if not self._qflag:
+ if self._rflag: print jfn, ": ", self._fileHdr
+ elif self._vflag: print "* Reading %s" % jfn
+ else:
+ print ".",
+ sys.stdout.flush()
+ return False
+
+ def _checkOwi(self, hdr):
+ return self._fileHdrOwi == hdr.owi()
+
+ def _fileFull(self):
+ return self._file.tell() >= self._ji.getJrnlFileSizeBytes()
+
+ def _getNextRecord(self, *oldestFileInfo):
+ if self._fileFull():
+ if self._advanceJrnlFile(): return True
+ try: hdr = Utils.load(self._file, Hdr)
+ except: return True
+ if hdr.empty(): return True
+ if hdr.check(): return True
+ self._recCnt += 1
+ self._fileHdrOwi = self._fileHdr.owi()
+ if self._firstRecFlag:
+ if self._fileHdr.fro != hdr.foffs:
+ raise Exception("File header first record offset mismatch: fro=0x%x;
rec_offs=0x%x" % (self._fileHdr.fro, hdr.foffs))
+ else:
+ if self._rflag: print " * fro ok: 0x%x" % self._fileHdr.fro
+ self._firstRecFlag = False
+ stop = False
+ if isinstance(hdr, EnqRec):
+ stop = self._handleEnqRec(hdr)
+ elif isinstance(hdr, DeqRec):
+ stop = self._handleDeqRec(hdr)
+ elif isinstance(hdr, TxnRec):
+ stop = self._handleTxnRec(hdr)
+ wstr = ""
+ for w in self._warning:
+ wstr += " (%s)" % w
+ if self._rflag: print " > %s %s" % (hdr, wstr)
+ self._warning = []
+ return stop
+
+ def _handleDeqRec(self, hdr):
+ if self._loadRec(hdr): return True
+
+ # Check OWI flag
+ if not self._checkOwi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csvDeqCb != None and self._csvDeqCb(self._csvStoreChk, hdr): return
True
+
+ try:
+ if hdr.xid == None:
+ self._emap.delete(hdr.deq_rid)
+ else:
+ self._tmap.add(self._fileHdr.fid, hdr)
+ except Warning, w: self._warning.append(str(w))
+ return False
+
+ def _handleEnqRec(self, hdr):
+ if self._loadRec(hdr): return True
+
+ # Check extern flag
+ if hdr.extern and hdr.data != None: raise Exception("Message data found on
external record: hdr=%s" % hdr)
+ # Check OWI flag
+ if not self._checkOwi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csvEnqCb != None and self._csvEnqCb(self._csvStoreChk, hdr): return
True
+
+ if hdr.xid == None:
+ self._emap.add(self._fileHdr.fid, hdr)
+ else:
+ self._txnMsgCnt += 1
+ self._tmap.add(self._fileHdr.fid, hdr)
+ self._msgCnt += 1
+ return False
+
+ def _handleTxnRec(self, hdr):
+ if self._loadRec(hdr): return True
+
+ # Check OWI flag
+ if not self._checkOwi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite
boundary.")
+ return True
+ # Test hook
+ if self._csvTxnCb != None and self._csvTxnCb(self._csvStoreChk, hdr): return
True
+
+ if hdr.magic[-1] == "a": self._abortCnt += 1
+ else: self._commitCnt += 1
+
+ if self._tmap.contains(hdr.xid):
+ mismatchedRids = self._tmap.delete(hdr)
+ if mismatchedRids != None and len(mismatchedRids) > 0:
+ self._warning.append("WARNING: transactional dequeues not found in
enqueue map; rids=%s" % mismatchedRids)
+ else:
+ self._warning.append("WARNING: %s not found in transaction map" %
Utils.formatXid(hdr.xid))
+ if hdr.magic[-1] == "c": # commits only
+ self._txnObjList[hdr.xid] = hdr
+ return False
+
+ def _incrFileNum(self):
+ self._fileNum += 1
+ if self._fileNum >= self._ji.getNumJrnlFiles():
+ self._fileNum = 0;
+ return self._fileNum
+
+ def _loadRec(self, hdr):
+ while not hdr.complete():
+ if self._advanceJrnlFile(): return True
+ hdr.load(self._file)
+ return False
+
+
+#==============================================================================
+
+_CLASSES = {
+ "a": TxnRec,
+ "c": TxnRec,
+ "d": DeqRec,
+ "e": EnqRec,
+ "f": FileHdr
+}
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
Added: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize (rev 0)
+++ store/trunk/cpp/tools/resize 2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,306 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import jrnl
+import glob, optparse, os, sys, time
+
+class Resize(object):
+ """
+ Creates a new store journal and copies records from old journal to new. The new
journal may be of
+ different size from the old one. The records are packed into the new journal (ie only
remaining
+ enqueued records and associated transactions - if any - are copied over without
spaces between them).
+
+ The default action is to push the old journal down into a 'bak' sub-directory
and then create a
+ new journal of the same size and pack it with the records from the old. However, it
is possible to
+ suppress the pushdown (using --no-pushdown), in which case either a new journal id
(using
+ --new-base-filename) or an old journal id (usnig --old-base-filename) must be
supplied. In the former
+ case,a new journal will be created using the new base file name alongside the old
one. In the latter
+ case, the old journal will be renamed to the supplied name, and the new one will take
the default.
+ Note that both can be specified together with the --no-pushdown option.
+
+ To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters.
These
+ should be large enough to write all the records or an error will result. If the size
is large enough
+ to write all records, but too small to keep below the enqueue threshold, a warning
will be printed.
+ Note that as any valid size will be accepted, a journal can also be shrunk, as long
as it is sufficiently
+ big to accept the transferred records.
+ """
+
+ BAK_DIR = "bak"
+ JFILE_SIZE_PGS_MIN = 1
+ JFILE_SIZE_PGS_MAX = 32768
+ NUM_JFILES_MIN = 4
+ NUM_JFILES_MAX = 64
+
+ def __init__(self, args):
+ self._opts = None
+ self._jdir = None
+ self._fName = None
+ self._fNum = None
+ self._file = None
+ self._fileRecWrCnt = None
+ self._fillerWrCnt = None
+ self._lastRecFid = None
+ self._lastRecOffs = None
+ self._recWrCnt = None
+
+ self._jrnlInfo = None
+ self._jrnlAnal = None
+ self._jrnlRdr = None
+
+ self._processArgs(args)
+ self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
+ self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
+ self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag,
self._opts.rflag, self._opts.vflag)
+
+ def run(self):
+ if not self._opts.qflag: print self._jrnlAnal
+ self._jrnlRdr.run()
+ if self._opts.vflag: print self._jrnlInfo
+ if not self._opts.qflag: print self._jrnlRdr.report(self._opts.vflag,
self._opts.rflag)
+ self._handleOldFiles()
+ self._createNewFiles()
+ if not self._opts.qflag: print "Transferred %d records to new journal."
% self._recWrCnt
+ self._chkFree()
+
+ def _chkFree(self):
+ if self._lastRecFid == None or self._lastRecOffs == None: return
+ wrCapacityBytes = self._lastRecFid * self._jrnlInfo.getJrnlSizeBytes() +
self._lastRecOffs
+ totCapacityBytes = self._jrnlInfo.getTotalJrnlFileCapacityBytes()
+ percentFull = 100.0 * wrCapacityBytes / totCapacityBytes
+ if percentFull > 80.0:
+ raise jrnl.Warning("WARNING: Journal %s is %2.1f%% full and will likely
not allow enqueuing of new records until some existing records are dequeued." %
(self._jrnlInfo.getJrnlId(), percentFull))
+
+ def _createNewFiles(self):
+ # Assemble records to be transfered
+ masterRecordList = {}
+ txnRecordList = self._jrnlRdr.txnObjList()
+ if self._opts.vflag and self._jrnlRdr.emap().size() > 0:
+ print "* Assembling %d records from emap" %
self._jrnlRdr.emap().size()
+ for t in self._jrnlRdr.emap().getRecList():
+ hdr = t[1]
+ hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+ masterRecordList[long(hdr.rid)] = hdr
+ if hdr.xidsize > 0 and hdr.xid in txnRecordList.keys():
+ txnHdr = txnRecordList[hdr.xid]
+ del(txnRecordList[hdr.xid])
+ txnHdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+ masterRecordList[long(txnHdr.rid)] = txnHdr
+ if self._opts.vflag and self._jrnlRdr.tmap().size() > 0:
+ print "* Assembling %d records from tmap" %
self._jrnlRdr.tmap().size()
+ for x in self._jrnlRdr.tmap().xids():
+ for t in self._jrnlRdr.tmap().get(x):
+ hdr = t[1]
+ hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+ masterRecordList[hdr.rid] = hdr
+ ridList = masterRecordList.keys()
+ ridList.sort()
+
+ # get base filename
+ bfn = self._opts.bfn
+ if self._opts.nbfn != None:
+ bfn = self._opts.nbfn
+
+ # write jinf file
+ self._jrnlInfo.resize(self._opts.njf, self._opts.jfs)
+ self._jrnlInfo.write(self._jdir, bfn)
+
+ # write records
+ if self._opts.vflag: print "* Transferring records to new journal
files"
+ fro = jrnl.sblkSize
+ while len(ridList) > 0:
+ hdr = masterRecordList[ridList.pop(0)]
+ rec = hdr.encode()
+ pos = 0
+ while pos < len(rec):
+ if self._file == None or self._file.tell() >=
self._jrnlInfo.getJrnlFileSizeBytes():
+ if self._file == None: rid = hdr.rid
+ elif len(ridList) == 0: rid = None
+ else: rid = ridList[0]
+ if not self._rotateFile(rid, fro):
+ raise Exception("ERROR: Ran out of journal space while
writing records.")
+ if len(rec) - pos <= self._jrnlInfo.getJrnlFileSizeBytes() -
self._file.tell():
+ self._file.write(rec[pos:])
+ self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(),
jrnl.dblkSize))
+ pos = len(rec)
+ fro = jrnl.sblkSize
+ else:
+ l = self._jrnlInfo.getJrnlFileSizeBytes() - self._file.tell()
+ self._file.write(rec[pos:pos+l])
+ pos += l
+ rem = len(rec) - pos
+ if rem <= self._jrnlInfo.getJrnlSizeBytes():
+ fro = (jrnl.Utils.sizeInBytesToBlk(jrnl.sblkSize + rem,
jrnl.dblkSize))
+ else:
+ fro = 0
+ self._recWrCnt += 1
+ self._fileRecWrCnt += 1
+ self._fillFile(addFillerRecs = True)
+ while self._rotateFile(): pass
+
+ def _fillFile(self, toPosn = None, addFillerRecs = False):
+ if self._file == None: return
+ if addFillerRecs:
+ nfr = int(jrnl.Utils.remBytesInBlk(self._file, jrnl.sblkSize) /
jrnl.dblkSize)
+ if nfr > 0:
+ self._fillerWrCnt = nfr
+ for i in range(0, nfr):
+ self._file.write("RHMx")
+ self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(),
jrnl.dblkSize))
+ self._lastRecFid = self._fNum
+ self._lastRecOffs = self._file.tell()
+ if toPosn == None: toPosn = self._jrnlInfo.getJrnlFileSizeBytes()
+ elif toPosn > self._jrnlInfo.getJrnlFileSizeBytes(): raise
Exception("Filling to size > max file size")
+ diff = toPosn - self._file.tell()
+ self._file.write(str("\0" * diff))
+ #DEBUG
+ if self._file.tell() != toPosn: raise Exception("OOPS - File size
problem")
+
+ def _rotateFile(self, rid = None, fro = None):
+ if self._file != None:
+ self._file.close()
+ if self._opts.vflag:
+ if self._fileRecWrCnt == 0:
+ print " (empty)"
+ elif self._fillerWrCnt == None:
+ print " (%d records)" % self._fileRecWrCnt
+ else:
+ print " (%d records + %d filler(s))" %
(self._fileRecWrCnt, self._fillerWrCnt)
+ if self._fNum == None:
+ self._fNum = 0
+ self._recWrCnt = 0
+ elif self._fNum == self._jrnlInfo.getNumJrnlFiles() - 1: return False
+ else: self._fNum += 1
+ self._fileRecWrCnt = 0
+ self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04d.jdat"
% (self._jrnlInfo.getJrnlBaseName(), self._fNum))
+ if self._opts.vflag: print "* Opening file %s" % self._fName,
+ self._file = open(self._fName, "w")
+ if rid == None or fro == None:
+ self._fillFile()
+ else:
+ t = time.time()
+ fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.hdrVer,
int(jrnl.Hdr.big_endian_flag), 0, rid)
+ fhdr.init(self._file, 0, self._fNum, self._fNum, fro, int(t), 1000000000*(t -
int(t)))
+ self._file.write(fhdr.encode())
+ self._fillFile(jrnl.sblkSize)
+ return True
+
+ def _handleOldFiles(self):
+ targetDir = self._jdir
+ if not self._opts.npd:
+ targetDir = os.path.join(self._jdir, self.BAK_DIR)
+ if os.path.exists(targetDir):
+ if self._opts.vflag: print "* Pushdown directory %s exists, deleting
content" % targetDir
+ for f in glob.glob(os.path.join(targetDir, "*")):
+ os.unlink(f)
+ else:
+ if self._opts.vflag: print "* Creating new pushdown directory
%s" % targetDir
+ os.mkdir(targetDir)
+
+ if not self._opts.npd or self._opts.obfn != None:
+ if self._opts.obfn != None and self._opts.vflag: print "* Renaming old
journal files using base name %s" % self._opts.obfn
+ # .jdat files
+ for fn in glob.glob(os.path.join(self._jdir, "%s.*.jdat" %
self._opts.bfn)):
+ tbfn = os.path.basename(fn)
+ if self._opts.obfn != None:
+ i1 = tbfn.rfind(".")
+ if i1 >= 0:
+ i2 = tbfn.rfind(".", 0, i1)
+ if i2 >= 0:
+ tbfn = "%s%s" % (self._opts.obfn, tbfn[i2:])
+ os.rename(fn, os.path.join(targetDir, tbfn))
+ # .jinf file
+ self._jrnlInfo.write(targetDir, self._opts.obfn)
+ os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
+
+ def _printOptions(self):
+ if self._opts.vflag:
+ print "Journal dir: %s" % self._jdir
+ print "Options: Base filename: %s" % self._opts.bfn
+ print " New base filename: %s" % self._opts.nbfn
+ print " Old base filename: %s" % self._opts.obfn
+ print " Pushdown: %s" % self._opts.npd
+ print " No. journal files: %d" % self._opts.njf
+ print " Journal file size: %d 64kiB blocks" %
self._opts.jfs
+ print " Show records flag: %s" % self._opts.rflag
+ print " Verbose flag: %s" % True
+ print
+
+ def _processArgs(self, argv):
+ op = optparse.OptionParser(usage="%prog [options] DIR",
version="%prog 1.0")
+ op.add_option("-b", "--base-filename",
+ action="store", dest="bfn",
default="JournalData",
+ help="Base filename for old journal files")
+ op.add_option("-B", "--new-base-filename",
+ action="store", dest="nbfn",
+ help="Base filename for new journal files")
+ op.add_option("-n", "--no-pushdown",
+ action="store_true", dest="npd",
+ help="Suppress pushdown of old files into \"bak\"
dir; old files will remain in existing dir")
+ op.add_option("-N", "--num-jfiles",
+ action="store", type="int",
dest="njf", default=8,
+ help="Number of files for new journal (%d-%d)" %
(self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ op.add_option("-o", "--old-base-filename",
+ action="store", dest="obfn",
+ help="Base filename for old journal files")
+ op.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ op.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print remaining records and transactions")
+ op.add_option("-s", "--jfile-size-pgs",
+ action="store", type="int",
dest="jfs", default=24,
+ help="Size of each new journal file in 64kiB blocks
(%d-%d)" % (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ op.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self._opts, args) = op.parse_args()
+ if len(args) == 0:
+ op.error("No journal directory argument")
+ elif len(args) > 1:
+ op.error("Too many positional arguments: %s" % args)
+ if self._opts.qflag and self._opts.rflag:
+ op.error("Quiet (-q/--quiet) and record (-r/--records) options are
mutually exclusive")
+ if self._opts.qflag and self._opts.vflag:
+ op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are
mutually exclusive")
+ if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or
self._opts.njf > self.NUM_JFILES_MAX):
+ op.error("Number of files (%d) is out of range (%d-%d)" %
(self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+ if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+ op.error("File size (%d) is out of range (%d-%d)" %
(self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+ if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn ==
None):
+ op.error("If (-n/--no-pushdown) is used, then at least one of
(-B/--new-base-filename) and (-o/--old-base-filename) must be used.")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ op.error("Journal path \"%s\" does not exist" %
self._jdir)
+ self._printOptions()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ r = Resize(sys.argv);
+ try: r.run()
+ except Exception, e: sys.exit(e)
Property changes on: store/trunk/cpp/tools/resize
___________________________________________________________________
Name: svn:executable
+ *
Added: store/trunk/cpp/tools/store_chk
===================================================================
--- store/trunk/cpp/tools/store_chk (rev 0)
+++ store/trunk/cpp/tools/store_chk 2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import jrnl
+import optparse, os, sys, xml.parsers.expat
+
+#== class Main ================================================================
+
+class Main(object):
+ """
+ This class:
+ 1. Reads a journal jinf file, and from its info:
+ 2. Analyzes the journal data files to determine which is the last to be written,
then
+ 3. Reads and analyzes all the records in the journal files.
+ The only public method is run() which kicks off the analysis.
+ """
+
+ def __init__(self, args):
+ # params
+ self._opts = None
+ self._jdir = None
+
+ # recovery analysis objects
+ self._jrnlInfo = None
+ self._jrnlRdr = None
+
+ self._processArgs(args)
+ self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
+ self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
+ self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag,
self._opts.rflag, self._opts.vflag)
+
+ def run(self):
+ if not self._opts.qflag:
+ print self._jrnlInfo
+ print self._jrnlAnal
+ self._jrnlRdr.run()
+ self._report()
+
+ def _report(self):
+ if not self._opts.qflag:
+ print
+ print " === REPORT ===="
+ print
+ print "Records: %8d non-transactional" %
(self._jrnlRdr.msgCnt() - self._jrnlRdr.txnMsgCnt())
+ print " %8d transactional" %
self._jrnlRdr.txnMsgCnt()
+ print " %8d total" % self._jrnlRdr.msgCnt()
+ print
+ print "Transactions: %8d aborts" % self._jrnlRdr.abortCnt()
+ print " %8d commits" % self._jrnlRdr.commitCnt()
+ print " %8d total" % (self._jrnlRdr.abortCnt() +
self._jrnlRdr.commitCnt())
+ print
+ if self._jrnlRdr.emap().size() > 0:
+ print "Remaining enqueued records (sorted by rid): "
+ for rid in sorted(self._jrnlRdr.emap().rids()):
+ tup = self._jrnlRdr.emap().get(rid)
+ locked = ""
+ if tup[2]:
+ locked += " (locked)"
+ print " fid=%d %s%s" % (tup[0], tup[1], locked)
+ print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records
remain." % self._jrnlRdr.emap().size()
+ else:
+ print "No remaining enqueued records found (emap empty)."
+ print
+ if self._jrnlRdr.tmap().size() > 0:
+ txnRecCnt = 0
+ print "Incomplete transactions: "
+ for xid in self._jrnlRdr.tmap().xids():
+ jrnl.Utils.formatXid(xid)
+ recs = self._jrnlRdr.tmap().get(xid)
+ for tup in recs:
+ print " fid=%d %s" % (tup[0], tup[1])
+ print " Total: %d records for %s" % (len(recs),
jrnl.Utils.formatXid(xid))
+ print
+ txnRecCnt += len(recs)
+ print "WARNING: Incomplete transactions found, %d xids remain
containing a total of %d records." % (self._jrnlRdr.tmap().size(), txnRecCnt)
+ else:
+ print "No incomplete transactions found (tmap empty)."
+ print
+ print "%d enqueues, %d journal records processed." %
(self._jrnlRdr.msgCnt(), self._jrnlRdr.recCnt())
+
+
+ def _processArgs(self, argv):
+ op = optparse.OptionParser(usage="%prog [options] DIR",
version="%prog 1.0")
+ op.add_option("-b", "--base-filename",
+ action="store", dest="bfn",
default="JournalData",
+ help="Base filename for old journal files")
+ op.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ op.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print remaining records and transactions")
+ op.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self._opts, args) = op.parse_args()
+ if len(args) == 0:
+ op.error("No journal directory argument")
+ elif len(args) > 1:
+ op.error("Too many positional arguments: %s" % args)
+ if self._opts.qflag and self._opts.rflag:
+ op.error("Quiet (-q/--quiet) and record (-r/--records) options are
mutually exclusive")
+ if self._opts.qflag and self._opts.vflag:
+ op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are
mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ op.error("Journal path \"%s\" does not exist" %
self._jdir)
+
+
+#== class CsvMain =============================================================
+
+class CsvMain(Main):
+ """
+ This class, in addition to analyzing a journal, can compare the journal footprint (ie
enqueued/dequeued/transaction
+ record counts) to expected values from a CSV file. This can be used for additional
automated testing, and is
+ currently in use in the long store tests for journal encode testing.
+ """
+
+ # CSV file cols
+ TEST_NUM_COL = 0
+ NUM_MSGS_COL = 5
+ MIN_MSG_SIZE_COL = 7
+ MAX_MSG_SIZE_COL = 8
+ MIN_XID_SIZE_COL = 9
+ MAX_XID_SIZE_COL = 10
+ AUTO_DEQ_COL = 11
+ TRANSIENT_COL = 12
+ EXTERN_COL = 13
+ COMMENT_COL = 20
+
+ def __init__(self, args):
+ # csv params
+ self._numMsgs = None
+ self._msgLen = None
+ self._autoDeq = None
+ self._xidLen = None
+ self._transient = None
+ self._extern = None
+
+ self._warning = []
+
+ Main.__init__(self, args)
+ self._jrnlRdr.setCallbacks(self, CsvMain._csvPreRunChk, CsvMain._csvEnqChk,
CsvMain._csvDeqChk, CsvMain._csvTxnChk, CsvMain._csvPostRunChk)
+ self._getCsvTest()
+
+ def _getCsvTest(self):
+ if self._opts.csvfn != None and self._opts.tnum != None:
+ tparams = self._readCsvFile(self._opts.csvfn, self._opts.tnum)
+ if tparams == None:
+ print "ERROR: Test %d not found in CSV file \"%s\"" %
(self._opts.tnum, self._opts.csvfn)
+ sys.exit(1)
+ self._numMsgs = tparams["num_msgs"]
+ if tparams["min_size"] == tparams["max_size"]:
+ self._msgLen = tparams["max_size"]
+ else:
+ self._msgLen = 0
+ self._autoDeq = tparams["auto_deq"]
+ if tparams["xid_min_size"] == tparams["xid_max_size"]:
+ self._xidLen = tparams["xid_max_size"]
+ else:
+ self._xidLen = 0
+ self._transient = tparams["transient"]
+ self._extern = tparams["extern"]
+
+ def _readCsvFile(self, filename, tnum):
+ try:
+ f=open(filename, "r")
+ except IOError:
+ print "ERROR: Unable to open CSV file \"%s\"" % filename
+ sys.exit(1)
+ for l in f:
+ sl = l.strip().split(",")
+ if len(sl[0]) > 0 and sl[0][0] != "\"":
+ try:
+ if (int(sl[self.TEST_NUM_COL]) == tnum):
+ return { "num_msgs":int(sl[self.NUM_MSGS_COL]),
+ "min_size":int(sl[self.MIN_MSG_SIZE_COL]),
+ "max_size":int(sl[self.MAX_MSG_SIZE_COL]),
+ "auto_deq":not (sl[self.AUTO_DEQ_COL] ==
"FALSE" or sl[self.AUTO_DEQ_COL] == "0"),
+
"xid_min_size":int(sl[self.MIN_XID_SIZE_COL]),
+
"xid_max_size":int(sl[self.MAX_XID_SIZE_COL]),
+ "transient":not (sl[self.TRANSIENT_COL] ==
"FALSE" or sl[self.TRANSIENT_COL] == "0"),
+ "extern":not (sl[self.EXTERN_COL] ==
"FALSE" or sl[self.EXTERN_COL] == "0"),
+ "comment":sl[self.COMMENT_COL] }
+ except Exception:
+ pass
+ return None
+
+ def _processArgs(self, argv):
+ op = optparse.OptionParser(usage="%prog [options] DIR",
version="%prog 1.0")
+ op.add_option("-b", "--base-filename",
+ action="store", dest="bfn",
default="JournalData",
+ help="Base filename for old journal files")
+ op.add_option("-c", "--csv-filename",
+ action="store", dest="csvfn",
+ help="CSV filename containing test parameters")
+ op.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ op.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print remaining records and transactions")
+ op.add_option("-t", "--test-num",
+ action="store", type="int",
dest="tnum",
+ help="Test number from CSV file - only valid if CSV file
named")
+ op.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self._opts, args) = op.parse_args()
+ if len(args) == 0:
+ op.error("No journal directory argument")
+ elif len(args) > 1:
+ op.error("Too many positional arguments: %s" % args)
+ if self._opts.qflag and self._opts.rflag:
+ op.error("Quiet (-q/--quiet) and record (-r/--records) options are
mutually exclusive")
+ if self._opts.qflag and self._opts.vflag:
+ op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are
mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ op.error("Journal path \"%s\" does not exist" %
self._jdir)
+
+ # Callbacks for checking against CSV test parameters. Return False if ok, True to
raise error.
+
+ @staticmethod
+ def _csvPreRunChk(csvStoreChk):
+ if csvStoreChk._numMsgs == None: return
+ if csvStoreChk._jrnlAnal.isEmpty() and csvStoreChk._numMsgs > 0:
+ raise Exception("[CSV %d] All journal files are empty, but test expects
%d msg(s)." % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs))
+ return False
+
+ @staticmethod
+ def _csvEnqChk(csvStoreChk, hdr):
+ #if csvStoreChk._numMsgs == None: return
+ #
+ if csvStoreChk._extern != None:
+ if csvStoreChk._extern != hdr.extern:
+ raise Exception("[CSV %d] External flag mismatch: found extern=%s;
expected %s" % (csvStoreChk._opts.tnum, hdr.extern, csvStoreChk._extern))
+ if hdr.extern and hdr.data != None:
+ raise Exception("[CSV %d] Message data found on record with external
flag set" % csvStoreChk._opts.tnum)
+ if csvStoreChk._msgLen != None and csvStoreChk._msgLen > 0 and hdr.data !=
None and len(hdr.data) != csvStoreChk._msgLen:
+ raise Exception("[CSV %d] Message length mismatch: found %d; expected
%d" % (csvStoreChk._opts.tnum, len(hdr.data), csvStoreChk._msgLen))
+ if csvStoreChk._xidLen != None and csvStoreChk._xidLen > 0 and len(hdr.xid) !=
csvStoreChk._xidLen:
+ raise Exception("[CSV %d] Message XID mismatch: found %d; expected
%d" % (csvStoreChk._opts.tnum, len(hdr.xid), csvStoreChk._xidLen))
+ if csvStoreChk._transient != None and hdr.transient != csvStoreChk._transient:
+ raise Exception("[CSV %d] Transience mismatch: found trans=%s; expected
%s" % (csvStoreChk._opts.tnum, hdr.transient, csvStoreChk._transient))
+ return False
+
+ @staticmethod
+ def _csvDeqChk(csvStoreChk, hdr):
+ if csvStoreChk._autoDeq != None and not csvStoreChk._autoDeq:
+ self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in
non-dequeue test - ignoring." % (csvStoreChk._opts.tnum, hdr.rid))
+ return False
+
+ @staticmethod
+ def _csvTxnChk(csvStoreChk, hdr):
+ return False
+
+ @staticmethod
+ def _csvPostRunChk(csvStoreChk):
+ # Exclude this check if lastFileFlag is set - the count may be less than the
number of msgs sent because of journal overwriting
+ if csvStoreChk._numMsgs != None and not csvStoreChk._jrnlRdr._lastFileFlag and
csvStoreChk._numMsgs != csvStoreChk._jrnlRdr.msgCnt():
+ raise Exception("[CSV %s] Incorrect number of messages: Expected %d,
found %d" % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs,
csvStoreChk._jrnlRdr.msgCnt()))
+ return False
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ m = CsvMain(sys.argv)
+ try: m.run()
+ except Exception, e: sys.exit(e)
Property changes on: store/trunk/cpp/tools/store_chk
___________________________________________________________________
Name: svn:executable
+ *