[rhmessaging-commits] rhmessaging commits: r3726 - in store/trunk/cpp: tests/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 25 15:45:30 EST 2009


Author: kpvdr
Date: 2009-11-25 15:45:30 -0500 (Wed, 25 Nov 2009)
New Revision: 3726

Added:
   store/trunk/cpp/tools/
   store/trunk/cpp/tools/jrnl.py
   store/trunk/cpp/tools/resize
   store/trunk/cpp/tools/store_chk
Modified:
   store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
   store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Added python journal tools for inspection and resizing journal files

Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2009-11-25 17:57:03 UTC (rev 3725)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2009-11-25 20:45:30 UTC (rev 3726)
@@ -113,14 +113,15 @@
                         throw std::runtime_error(oss.str());
                     }
                     std::ostringstream oss;
-                    oss << ja << " -q -d " << jpp->jdir() << " -b " << jpp->base_filename();
+                    oss << ja << " -b " << jpp->base_filename();
                     // TODO: When jfile_check.py can handle previously recovered journals for
                     // specific tests, then remove this exclusion.
                     if (!_args.recover_mode)
                     {
-                        oss << " -c" << _args.test_case_csv_file_name;
-                        oss << " -t" << (*tci)->test_case_num();
+                        oss << " -c " << _args.test_case_csv_file_name;
+                        oss << " -t " << (*tci)->test_case_num();
                     }
+                    oss << " -q " << jpp->jdir();
                     bool res = system(oss.str().c_str()) != 0;
                     (*tci)->set_fmt_chk_res(res, jpp->jid());
                     if (res) _err_flag = true;

Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests	2009-11-25 17:57:03 UTC (rev 3725)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests	2009-11-25 20:45:30 UTC (rev 3726)
@@ -27,19 +27,19 @@
 # Run jtt using default test set
 echo
 echo "===== Mode 1: New journal instance, no recover ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
 rm -rf ${TMP_DATA_DIR}/test_0*
 echo
 echo "===== Mode 2: Re-use journal instance, no recover ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --format-chk --num-jrnls ${num_jrnls} || fail=1
 rm -rf ${TMP_DATA_DIR}/test_0*
 echo
 echo "===== Mode 3: New journal instance, recover previous test journal ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
 rm -rf ${TMP_DATA_DIR}/test_0*
 echo
 echo "===== Mode 4: Re-use journal instance, recover previous test journal ====="
-jtt/jtt --analyzer jtt/jfile_chk.py --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
 rm -rf ${TMP_DATA_DIR}/test_0*
 echo
 

Added: store/trunk/cpp/tools/jrnl.py
===================================================================
--- store/trunk/cpp/tools/jrnl.py	                        (rev 0)
+++ store/trunk/cpp/tools/jrnl.py	2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,1139 @@
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import os.path, sys, xml.parsers.expat
+from struct import pack, unpack, calcsize
+from time import gmtime, strftime
+
+# TODO: Get rid of these! Use jinf instance instead
+dblkSize = 128
+sblkSize = 4 * dblkSize
+
+#== protected and private ======================================================================
+
+_extern_mask    = 0x20
+
+class Utils(object):
+
+    __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ "
+
+    @staticmethod
+    def formatData(dsize, data):
+        if data == None:
+            return ""
+        if Utils.__isPrintable(data):
+            datastr = Utils.__splitStr(data)
+        else:
+            datastr = Utils.__hexSplitStr(data)
+        if dsize != len(data):
+            raise Exception("Inconsistent data size: dsize=%d, data(%d)=\"%s\"" % (dsize, len(data), datastr))
+        return "data(%d)=\"%s\" " % (dsize, datastr)
+
+    @staticmethod
+    def formatXid(xid, xidsize = None, uuidFormat = False):
+        if xid == None and xidsize != None:
+            if xidsize > 0: raise Exception("Inconsistent XID size: xidsize=%d, xid=None" % xidsize)
+            return ""
+        if Utils.__isPrintable(xid):
+            xidstr = Utils.__splitStr(xid)
+        else:
+            xidstr = Utils.__hexSplitStr(xid)
+        if xidsize == None:
+            xidsize = len(xid)
+        elif xidsize != len(xid):
+            raise Exception("Inconsistent XID size: xidsize=%d, xid(%d)=\"%s\"" % (xidsize, len(xid), xidstr))
+        return "xid(%d)=\"%s\" " % (xidsize, xidstr)
+    
+    @staticmethod
+    def invStr(s):
+        si = ""
+        for i in range(0,len(s)):
+            si += chr(~ord(s[i]) & 0xff)
+        return si
+
+    @staticmethod
+    def load(f, klass):
+        args = Utils.__loadArgs(f, klass)
+        subclass = klass.discriminate(args)
+        result = subclass(*args) # create instance of record
+        if subclass != klass:
+            result.init(f, *Utils.__loadArgs(f, subclass))
+        result.skip(f)
+        return result;
+    
+    @staticmethod
+    def loadFileData(f, size, data):
+        if size == 0:
+            return (data, True)
+        if data == None:
+            loaded = 0
+        else:
+            loaded = len(data)
+        foverflow = f.tell() + size - loaded > jfsize
+        if foverflow:
+            rsize = jfsize - f.tell()
+        else:
+            rsize = size - loaded
+        bin = f.read(rsize)
+        if data == None:
+            data = unpack("%ds" % (rsize), bin)[0]
+        else:
+            data = data + unpack("%ds" % (rsize), bin)[0]
+        return (data, not foverflow)
+
+    @staticmethod
+    def remBytesInBlk(f, blkSize):
+        foffs = f.tell()
+        return Utils.sizeInBytesToBlk(foffs, blkSize) - foffs;
+
+    @staticmethod
+    def sizeInBlks(size, blkSize):
+        return int((size + blkSize - 1) / blkSize)
+
+    @staticmethod
+    def sizeInBytesToBlk(size, blkSize):
+        return Utils.sizeInBlks(size, blkSize) * blkSize
+
+    @staticmethod
+    def __hexSplitStr(s, splitSize = 50):
+        if len(s) <= splitSize:
+            return Utils.__hexStr(s, 0, len(s))
+#        if len(s) > splitSize + 25:
+#            return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s, 55, 65) + " ... " + Utils.__hexStr(s, len(s)-10, len(s))
+        return Utils.__hexStr(s, 0, 10) + " ... " + Utils.__hexStr(s, len(s)-10, len(s))
+
+    @staticmethod
+    def __hexStr(s, b, e):
+        o = ""
+        for i in range(b, e):
+            if Utils.__isPrintable(s[i]):
+                o += s[i]
+            else:
+                o += "\\%02x" % ord(s[i])
+        return o
+
+    @staticmethod
+    def __isPrintable(s):
+        return s.strip(Utils.__printchars) == ""
+
+    @staticmethod
+    def __loadArgs(f, klass):
+        size = calcsize(klass.format)
+        foffs = f.tell(),
+        bin = f.read(size)
+        if len(bin) != size: raise Exception("End of file")
+        return foffs + unpack(klass.format, bin)
+
+    @staticmethod
+    def __splitStr(s, splitSize = 50):
+        if len(s) < splitSize:
+            return s
+        return s[:25] + " ... " + s[-25:]
+
+
+#== class Warning =============================================================
+
+class Warning(Exception):
+    def __init__(self, err):
+        Exception.__init__(self, err)
+
+
+#== class Sizeable ============================================================
+
+class Sizeable(object):
+
+    def size(self):
+        classes = [self.__class__]
+        size = 0
+        while classes:
+            cls = classes.pop()
+            if hasattr(cls, "format"):
+                size += calcsize(cls.format)
+            classes.extend(cls.__bases__)
+        return size
+
+
+#== class Hdr =================================================================
+
+class Hdr(Sizeable):
+ 
+    format = "=4sBBHQ"
+    hdrVer = 1
+    owi_mask = 0x01
+    big_endian_flag = sys.byteorder == "big"
+
+    def __init__(self, foffs, magic, ver, endn, flags, rid):
+        self.foffs = foffs
+        self.magic = magic
+        self.ver = ver
+        self.endn = endn
+        self.flags = flags
+        self.rid = long(rid)
+        
+    def __str__(self):
+        if self.empty():
+            return "0x%08x: <empty>" % (self.foffs)
+        if self.magic[-1] == "x":
+            return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
+        if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]:
+            return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn, self.flags, self.rid)
+        return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" %  (self.foffs, self.magic)
+    
+    @staticmethod
+    def discriminate(args):
+        return _CLASSES.get(args[1][-1], Hdr)
+    #discriminate = staticmethod(discriminate)
+
+    def empty(self):
+        return self.magic == "\x00"*4
+    
+    def encode(self):
+        return pack(Hdr.format, self.magic, self.ver, self.endn, self.flags, self.rid)
+
+    def owi(self):
+        return self.flags & self.owi_mask != 0
+
+    def skip(self, f):
+        f.read(Utils.remBytesInBlk(f, dblkSize))
+
+    def check(self):
+        if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]:
+            return True
+        if self.magic[-1] != "x":
+            if self.ver != self.hdrVer:
+                raise Exception("%s: Invalid header version: found %d, expected %d." % (self, self.ver, self.hdrVer))
+            if bool(self.endn) != self.big_endian_flag:
+                if self.big_endian_flag: e = "big"
+                else:                    e = "little"
+                raise Exception("Endian mismatch: this platform is %s and does not match record encoding (0x%04x)" % (e, self.endn))
+        return False
+        
+
+#== class FileHdr =============================================================
+
+class FileHdr(Hdr):
+
+    format = "=2H4x3Q"
+        
+    def __str__(self):
+        return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro, self.timestamp_str())
+    
+    def encode(self):
+        return Hdr.encode(self) + pack(FileHdr.format, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
+
+    def init(self, f, foffs, fid, lid, fro, time_sec, time_ns):
+        self.fid = fid
+        self.lid = lid
+        self.fro = fro
+        self.time_sec = time_sec
+        self.time_ns = time_ns
+
+    def skip(self, f):
+        f.read(Utils.remBytesInBlk(f, sblkSize))
+
+    def timestamp(self):
+        return (self.time_sec, self.time_ns)
+
+    def timestamp_str(self):
+        ts = gmtime(self.time_sec)
+        fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
+        return strftime(fstr, ts)
+
+
+#== class DeqRec ==============================================================
+
+class DeqRec(Hdr):
+
+    format = "=QQ"
+
+    def __str__(self):
+        return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize), self.deq_rid)
+
+    def init(self, f, foffs, deq_rid, xidsize):
+        self.deq_rid = deq_rid
+        self.xidsize = xidsize
+        self.xid = None
+        self.deq_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+    
+    def encode(self):
+        d = Hdr.encode(self) + pack(DeqRec.format, self.deq_rid, self.xidsize)
+        if self.xidsize > 0:
+            fmt = "%ds" % (self.xidsize)
+            d += pack(fmt, self.xid)
+            d += self.deq_tail.encode()
+        return d
+
+    def load(self, f):
+        if self.xidsize == 0:
+            self.xid_complete = True
+            self.tail_complete = True
+        else:
+            if not self.xid_complete:
+                (self.xid, self.xid_complete) = Utils.loadFileData(f, self.xidsize, self.xid)
+            if self.xid_complete and not self.tail_complete:
+                ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+                self.tail_bin = ret[0]
+                if ret[1]:
+                    self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                    if self.deq_tail.magic_inv != Utils.invStr(self.magic) or self.deq_tail.rid != self.rid:
+                        raise Exception(" > %s *INVALID TAIL RECORD*" % self)
+                    self.deq_tail.skip(f)
+                self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.tail_complete
+
+
+#== class TxnRec ==============================================================
+
+class TxnRec(Hdr):
+
+    format = "=Q"
+
+    def __str__(self):
+        return "%s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize))
+
+    def init(self, f, foffs, xidsize):
+        self.xidsize = xidsize
+        self.xid = None
+        self.tx_tail = None
+        self.xid_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+    
+    def encode(self):
+         return Hdr.encode(self) + pack(TxnRec.format, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + self.tx_tail.encode()
+
+    def load(self, f):
+        if not self.xid_complete:
+            ret = Utils.loadFileData(f, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.tail_complete:
+            ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                if self.tx_tail.magic_inv != Utils.invStr(self.magic) or self.tx_tail.rid != self.rid:
+                    raise Exception(" > %s *INVALID TAIL RECORD*" % self)
+                self.tx_tail.skip(f)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.tail_complete
+
+
+#== class EnqRec ==============================================================
+
+class EnqRec(Hdr):
+
+    format = "=QQ"
+    transient_mask = 0x10
+    extern_mask = 0x20
+
+    def __str__(self):
+        return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.formatXid(self.xid, self.xidsize), Utils.formatData(self.dsize, self.data), self.enq_tail, self.print_flags())
+    
+    def encode(self):
+        d = Hdr.encode(self) + pack(EnqRec.format, self.xidsize, self.dsize)
+        if self.xidsize > 0:
+            d += pack("%ds" % self.xidsize, self.xid)
+        if self.dsize > 0:
+            d += pack("%ds" % self.dsize, self.data)
+        if self.xidsize > 0 or self.dsize > 0:
+            d += self.enq_tail.encode()
+        return d
+
+    def init(self, f, foffs, xidsize, dsize):
+        self.xidsize = xidsize
+        self.dsize = dsize
+        self.transient = self.flags & self.transient_mask > 0
+        self.extern = self.flags & self.extern_mask > 0
+        self.xid = None
+        self.data = None
+        self.enq_tail = None
+        self.xid_complete = False
+        self.data_complete = False
+        self.tail_complete = False
+        self.tail_bin = None
+        self.tail_offs = 0
+        self.load(f)
+
+    def load(self, f):
+        if not self.xid_complete:
+            ret = Utils.loadFileData(f, self.xidsize, self.xid)
+            self.xid = ret[0]
+            self.xid_complete = ret[1]
+        if self.xid_complete and not self.data_complete:
+            if self.extern:
+                self.data_complete = True
+            else:
+                ret = Utils.loadFileData(f, self.dsize, self.data)
+                self.data = ret[0]
+                self.data_complete = ret[1]
+        if self.data_complete and not self.tail_complete:
+            ret = Utils.loadFileData(f, calcsize(RecTail.format), self.tail_bin)
+            self.tail_bin = ret[0]
+            if ret[1]:
+                self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
+                if self.enq_tail.magic_inv != Utils.invStr(self.magic) or self.enq_tail.rid != self.rid:
+                    raise Exception(" > %s *INVALID TAIL RECORD*" % self)
+                self.enq_tail.skip(f)
+            self.tail_complete = ret[1]
+        return self.complete()
+
+    def complete(self):
+        return self.xid_complete and self.data_complete and self.tail_complete
+
+    def print_flags(self):
+        s = ""
+        if self.transient:
+            s = "*TRANSIENT"
+        if self.extern:
+            if len(s) > 0:
+                s += ",EXTERNAL"
+            else:
+                s = "*EXTERNAL"
+        if len(s) > 0:
+            s += "*"
+        return s
+
+
+#== class RecTail =============================================================
+
+class RecTail(Sizeable):
+
+    format = "=4sQ"
+
+    def __init__(self, foffs, magic_inv, rid):
+        self.foffs = foffs
+        self.magic_inv = magic_inv
+        self.rid = long(rid)
+
+    def __str__(self):
+        magic = Utils.invStr(self.magic_inv)
+        return "[\"%s\" rid=0x%x]" % (magic, self.rid)
+    
+    def encode(self):
+        return pack(RecTail.format, self.magic_inv, self.rid)
+
+    def skip(self, f):
+        f.read(Utils.remBytesInBlk(f, dblkSize))
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+    
+    def __init__(self):
+        self.__map = {}
+    
+    def __str__(self):
+        return self.report(True, True)
+        
+    def add(self, fid, hdr):
+        if hdr.rid in self.__map.keys(): raise Exception("ERROR: Duplicate rid to EnqMap: rid=0x%x" % hdr.rid)
+        self.__map[hdr.rid] = (fid, hdr, False)
+    
+    def contains(self, rid):
+        return rid in self.__map.keys()
+    
+    def delete(self, rid):
+        if rid in self.__map.keys():
+            if self.getLock(rid):
+                raise Exception("ERROR: Deleting locked record from EnqMap: rid=0x%s" % rid)
+            del self.__map[rid]
+        else:
+            raise Warning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
+    
+    def get(self, rid):
+        if self.contains(rid): return self.__map[rid]
+        return None
+    
+    def getFid(self, rid):
+        if self.contains(rid): return self.__map[rid][0]
+        return None
+    
+    def getHdr(self, rid):
+        if self.contains(rid): return self.__map[rid][1]
+        return None
+    
+    def getLock(self, rid):
+        if self.contains(rid): return self.__map[rid][2]
+        return None
+    
+    def getRecList(self):
+        return self.__map.values()
+    
+    def lock(self, rid):
+        if rid in self.__map.keys():
+            tup = self.__map[rid]
+            self.__map[rid] = (tup[0], tup[1], True)
+        else:
+            raise Warning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
+        
+    def report(self, showStats, showRecords):
+        if len(self.__map) == 0: return "No enqueued records found."
+        s = "%d enqueued records found" % len(self.__map)
+        if showRecords:
+            s += ":"
+            ridList = self.__map.keys()
+            ridList.sort()
+            for rid in ridList:
+#            for f,r in self.__map.iteritems():
+                r = self.__map[rid]
+                if r[2]:
+                    lockStr = " [LOCKED]"
+                else:
+                    lockStr = ""
+                s += "\n  lfid=%d %s %s" % (r[0], r[1], lockStr)
+        else:
+            s += "."
+        return s
+    
+    def rids(self):
+        return self.__map.keys()
+    
+    def size(self):
+        return len(self.__map)
+    
+    def unlock(self, rid):
+        if rid in self.__map.keys():
+            tup = self.__map[rid]
+            if tup[2]:
+                self.__map[rid] = (tup[0], tup[1], False)
+            else:
+                raise Exception("ERROR: Unlocking rid which is not locked in EnqMap: rid=0x%x" % rid)
+        else:
+            raise Exception("ERROR: Unlocking non-existent rid in EnqMap: rid=0x%x" % rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+    
+    def __init__(self, emap):
+        self.__emap = emap
+        self.__map = {}
+    
+    def __str__(self):
+        return self.report(True, True)
+    
+    def add(self, fid, hdr):
+        if isinstance(hdr, DeqRec):
+            self.__emap.lock(hdr.deq_rid)
+        if hdr.xid in self.__map.keys():
+            self.__map[hdr.xid].append((fid, hdr)) # append to existing list
+        else:
+            self.__map[hdr.xid] = [(fid, hdr)] # create new list
+    
+    def contains(self, xid):
+        return xid in self.__map.keys()
+    
+    def delete(self, hdr):
+        if hdr.magic[-1] == "c": return self.__commit(hdr.xid)
+        if hdr.magic[-1] == "a": self.__abort(hdr.xid)
+        else: raise Exception("ERROR: cannot delete from TxnMap using hdr type %s" % hdr.magic)
+        
+    def get(self, xid):
+        if self.contains(xid): return self.__map[xid]
+        
+    def report(self, showStats, showRecords):
+        if len(self.__map) == 0: return "No outstanding transactions found."
+        s = "%d outstanding transactions found" % len(self.__map)
+        if showRecords:
+            s += ":"
+            for x,t in self.__map.iteritems():
+                s += "\n  xid=%s:" % Utils.formatXid(x)
+                for i in t:
+                    s += "\n   %s" % str(i[1])
+        else:
+            s += "."
+        return s
+    
+    def size(self):
+        return len(self.__map)
+    
+    def xids(self):
+        return self.__map.keys()
+    
+    def __abort(self, xid):
+        for fid, hdr in self.__map[xid]:
+            if isinstance(hdr, DeqRec):
+                self.__emap.unlock(hdr.rid)
+        del self.__map[xid]
+    
+    def __commit(self, xid):
+        mismatchList = []
+        for fid, hdr in self.__map[xid]:
+            if isinstance(hdr, EnqRec): 
+                self.__emap.add(fid, hdr) # Transfer enq to emap
+            else:
+                if self.__emap.contains(hdr.deq_rid):
+                    self.__emap.unlock(hdr.deq_rid)
+                    self.__emap.delete(hdr.deq_rid)
+                else:
+                    mismatchList.append("0x%x" % hdr.deq_rid)
+        del self.__map[xid]
+        return mismatchList
+                
+
+
+#== class JrnlInfo ============================================================
+
+class JrnlInfo(object):
+    """
+    This object reads and writes journal information files (<basename>.jinf). Methods are provided
+    to read a file, query its properties and reset just those properties necessary for normalizing
+    and resizing a journal.
+    
+    Normalizing: resetting the directory and/or base filename to different values. This is necessary
+    if a set of journal files is copied from one location to another before being restored, as the
+    value of the path in the file no longer matches the actual path.
+    
+    Resizing: If the journal geometry parameters (size and number of journal files) changes, then the
+    .jinf file must reflect these changes, as this file is the source of information for journal
+    recovery.
+    
+    NOTE: Size vs File size: There are methods which return the journal size and file size of the
+    journal files.
+    
+    +-------------+--------------------/ /----------+
+    | File header |           File data             |
+    +-------------+--------------------/ /----------+
+    |             |                                 |
+    |             |<------------- Size ------------>|
+    |<------------------ FileSize ----------------->|
+    
+    Size: The size of the data content of the journal, ie that part which stores the data records.
+    
+    File size: The actual disk size of the journal including data and the file header which precedes the
+    data.
+    
+    The file header is fixed to 1 sblk, so  file size = jrnl size + sblk size.
+    """
+    
+    def __init__(self, jdir, bfn = "JournalData"):
+        self.__jdir = jdir
+        self.__bfn = bfn
+        self.__jinfDict = {}
+        self.__read_jinf()
+    
+    def __str__(self):
+        s = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
+        for k,v in self.__jinfDict.iteritems():
+            s += "  %s = %s\n" % (k, v)
+        return s
+    
+    def normalize(self, jdir = None, baseFilename = None):
+        if jdir == None:
+            self.__jinfDict["directory"] = self.__jdir
+        else:
+            self.__jdir = jdir
+            self.__jinfDict["directory"] = jdir
+        if baseFilename != None:
+            self.__bfn = baseFilename
+            self.__jinfDict["base_filename"] = baseFilename
+    
+    def resize(self, njFiles = None, jfSize = None):
+        if njFiles != None:
+            self.__jinfDict["number_jrnl_files"] = njFiles
+        if jfSize != None:
+            self.__jinfDict["jrnl_file_size_sblks"] = jfSize * dblkSize
+
+    def write(self, jdir = None, baseFilename = None):
+        self.normalize(jdir, baseFilename)
+        if not os.path.exists(self.getJrnlDir()): os.makedirs(self.getJrnlDir())
+        wdir = os.path.join(self.getJrnlDir(), "%s.jinf" % self.getJrnlBaseName())
+        f = open(os.path.join(self.getJrnlDir(), "%s.jinf" % self.getJrnlBaseName()), "w")
+        f.write("<?xml version=\"1.0\" ?>\n")
+        f.write("<jrnl>\n")
+        f.write("  <journal_version value=\"%d\" />\n" % self.getJrnlVersion())
+        f.write("  <journal_id>\n")
+        f.write("    <id_string value=\"%s\" />\n" % self.getJrnlId())
+        f.write("    <directory value=\"%s\" />\n" % self.getJrnlDir())
+        f.write("    <base_filename value=\"%s\" />\n" % self.getJrnlBaseName())
+        f.write("  </journal_id>\n")
+        f.write("  <creation_time>\n")
+        f.write("    <seconds value=\"%d\" />\n" % self.getCreationTime()[0])
+        f.write("    <nanoseconds value=\"%d\" />\n" % self.getCreationTime()[1])
+        f.write("    <string value=\"%s\" />\n" % self.getCreationTimeStr())
+        f.write("  </creation_time>\n")
+        f.write("  <journal_file_geometry>\n")
+        f.write("    <number_jrnl_files value=\"%d\" />\n" % self.getNumJrnlFiles())
+        f.write("    <auto_expand value=\"%s\" />\n" % str.lower(str(self.getAutoExpand())))
+        f.write("    <jrnl_file_size_sblks value=\"%d\" />\n" % self.getJrnlSizeSblks())
+        f.write("    <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.getJrnlSblkSize())
+        f.write("    <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.getJrnlDblkSize())
+        f.write("  </journal_file_geometry>\n")
+        f.write("  <cache_geometry>\n")
+        f.write("    <wcache_pgsize_sblks value=\"%d\" />\n" % self.getWriteBufferPageSizeSblks())
+        f.write("    <wcache_num_pages value=\"%d\" />\n" % self.getNumWriteBufferPages())
+        f.write("    <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.getReadBufferPageSizeSblks())
+        f.write("    <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.getNumReadBufferPages())
+        f.write("  </cache_geometry>\n")
+        f.write("</jrnl>\n")
+        f.close()
+    
+    # Journal ID
+    
+    def getJrnlVersion(self):
+        return self.__jinfDict["journal_version"]
+    
+    def getJrnlId(self):
+        return self.__jinfDict["id_string"]
+    
+    def getCurrentJnrlDir(self):
+        return self.__jdir
+    
+    def getJrnlDir(self):
+        return self.__jinfDict["directory"]
+    
+    def getJrnlBaseName(self):
+        return self.__jinfDict["base_filename"]
+    
+    # Journal creation time
+    
+    def getCreationTime(self):
+        return (self.__jinfDict["seconds"], self.__jinfDict["nanoseconds"])
+    
+    def getCreationTimeStr(self):
+        return self.__jinfDict["string"]
+    
+    # Files and geometry
+    
+    def getNumJrnlFiles(self):
+        return self.__jinfDict["number_jrnl_files"]
+    
+    def getAutoExpand(self):
+        return self.__jinfDict["auto_expand"]
+    
+    def getJrnlSblkSize(self):
+        return self.__jinfDict["JRNL_SBLK_SIZE"]
+    
+    def getJrnlDblkSize(self):
+        return self.__jinfDict["JRNL_DBLK_SIZE"]
+    
+    def getJrnlSizeSblks(self):
+        return self.__jinfDict["jrnl_file_size_sblks"]
+    
+    def getJrnlSizeDblks(self):
+        return self.getJrnlSizeSblks() * self.getJrnlSblkSize()
+    
+    def getJrnlSizeBytes(self):
+        return self.getJrnlSizeDblks() * self.getJrnlDblkSize()
+    
+    def getJrnlFileSizeSblks(self):
+        return self.getJrnlSizeSblks() + 1
+    
+    def getJrnlFileSizeDblks(self):
+        return self.getJrnlFileSizeSblks() * self.getJrnlSblkSize()
+    
+    def getJrnlFileSizeBytes(self):
+        return self.getJrnlFileSizeDblks() * self.getJrnlDblkSize()
+    
+    def getTotalJrnlFileCapacitySblks(self):
+        return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
+    
+    def getTotalJrnlFileCapacityDblks(self):
+        return self.getNumJrnlFiles() * self.getJrnlSizeDblks()
+    
+    def getTotalJrnlFileCapacityBytes(self):
+        return self.getNumJrnlFiles() * self.getJrnlSizeBytes()
+    
+    # Read and write buffers
+    
+    def getWriteBufferPageSizeSblks(self):
+        return self.__jinfDict["wcache_pgsize_sblks"]
+    
+    def getWriteBufferPageSizeDblks(self):
+        return self.getWriteBufferPageSizeSblks() * self.getJrnlSblkSize()
+    
+    def getWriteBufferPageSizeBytes(self):
+        return self.getWriteBufferPageSizeDblks() * self.getJrnlDblkSize()
+    
+    def getNumWriteBufferPages(self):
+        return self.__jinfDict["wcache_num_pages"]
+    
+    def getReadBufferPageSizeSblks(self):
+        return self.__jinfDict["JRNL_RMGR_PAGE_SIZE"]
+    
+    def getReadBufferPageSizeDblks(self):
+        return self.getReadBufferPageSizeSblks * self.getJrnlSblkSize()
+    
+    def getReadBufferPageSizeBytes(self):
+        return self.getReadBufferPageSizeDblks * self.getJrnlDblkSize()
+    
+    def getNumReadBufferPages(self):
+        return self.__jinfDict["JRNL_RMGR_PAGES"]
+    
+    def __read_jinf(self):
+        f = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
+        p = xml.parsers.expat.ParserCreate()
+        p.StartElementHandler = self.__handleXmlStartElement
+        p.CharacterDataHandler = self.__handleXmlCharData
+        p.EndElementHandler = self.__handleXmlEndElement
+        p.ParseFile(f)
+        f.close()
+
+    def __handleXmlStartElement(self, name, attrs):
+        # bool values
+        if name == "auto_expand":
+            self.__jinfDict[name] = attrs["value"] == "true"
+        # long values
+        elif name == "seconds" or \
+             name == "nanoseconds":
+            self.__jinfDict[name] = long(attrs["value"])
+        # int values
+        elif name == "journal_version" or \
+             name == "number_jrnl_files" or \
+             name == "jrnl_file_size_sblks" or \
+             name == "JRNL_SBLK_SIZE" or \
+             name == "JRNL_DBLK_SIZE" or \
+             name == "wcache_pgsize_sblks" or \
+             name == "wcache_num_pages" or \
+             name == "JRNL_RMGR_PAGE_SIZE" or \
+             name == "JRNL_RMGR_PAGES":
+            self.__jinfDict[name] = int(attrs["value"])
+        # strings
+        elif "value" in attrs:
+            self.__jinfDict[name] = attrs["value"]
+
+    def __handleXmlCharData(self, data): pass
+
+    def __handleXmlEndElement(self, name): pass
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+    """
+    This class analyzes a set of journal files and determines which is the last to be written
+    (the newest file),  and hence which should be the first to be read for recovery (the oldest
+    file).
+    
+    The analysis is performed on construction; the contents of the JrnlInfo object passed provide
+    the recovery details.
+    """
+
+    def __init__(self, jinf):
+        self.__oldest = None
+        self.__jinf = jinf
+        self.__flist = self._analyze()
+                
+    def __str__(self):
+        s = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.getCurrentJnrlDir()
+        if self.isEmpty():
+            s += "  <All journal files are empty>\n"
+        else:
+            for tup in self.__flist:
+                o = " "
+                if tup[0] == self.__oldest[0]: o = "*"
+                s += "  %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (o, os.path.basename(tup[1]), tup[2], tup[3], tup[4], tup[5])
+            for i in range(self.__flist[-1][0] + 1, self.__jinf.getNumJrnlFiles()):
+                s += "    %s.%04d.jdat: <empty>\n" % (self.__jinf.getJrnlBaseName(), i) 
+        return s
+        
+    # Analysis
+    
+    def getOldestFile(self):
+        return self.__oldest
+
+    def getOldestFileIndex(self):
+        if self.isEmpty(): return None
+        return self.__oldest[0]
+
+    def isEmpty(self):
+        return len(self.__flist) == 0
+    
+    def _analyze(self):
+        fname = ""
+        fnum = -1
+        rid = -1
+        fro = -1
+        tss = ""
+        owi_found = False
+        flist = []
+        for i in range(0, self.__jinf.getNumJrnlFiles()):
+            jfn = os.path.join(self.__jinf.getCurrentJnrlDir(), "%s.%04x.jdat" % (self.__jinf.getJrnlBaseName(), i))
+            f = open(jfn)
+            fhdr = Utils.load(f, Hdr)
+            if fhdr.empty(): break
+            this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+            flist.append(this_tup)
+            if i == 0:
+                init_owi = fhdr.owi()
+                self.__oldest = this_tup
+            elif fhdr.owi() != init_owi and not owi_found:
+                self.__oldest = this_tup
+                owi_found = True
+        return flist
+        
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+    """
+    This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
+    object list (txnObjList) which are populated by reading the journals from the oldest
+    to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+    objects supplied on construction provide the information used for the recovery.
+    
+    The analysis is performed on construction.
+    """
+    
+    def __init__(self, ji, jra, qflag = False, rflag = False, vflag = False):
+        self._ji = ji
+        self._jra = jra
+        self._qflag = qflag
+        self._rflag = rflag
+        self._vflag = vflag
+        
+        # test callback functions for CSV tests
+        self._csvStoreChk = None
+        self._csvStartCb = None
+        self._csvEnqCb = None
+        self._csvDeqCb = None
+        self._csvTxnCb = None
+        self._csvEndCb = None
+        
+        self._emap = EnqMap()
+        self._tmap = TxnMap(self._emap)
+        self._txnObjList = {}
+        
+        self._file = None
+        self._fileHdr = None
+        self._fileNum = None
+        self._firstRecFlag = None
+        self._fro = None
+        self._lastFileFlag = None
+        self._startFileNum = None
+        self._warning = []
+        
+        self._abortCnt = 0
+        self._commitCnt = 0
+        self._msgCnt = 0
+        self._recCnt = 0
+        self._txnMsgCnt = 0
+    
+    def __str__(self):
+        return self.report(True, self._rflag)
+
+    def abortCnt(self): return self._abortCnt
+
+    def commitCnt(self): return self._commitCnt
+    
+    def emap(self): return self._emap
+    
+    def msgCnt(self): return self._msgCnt
+    
+    def recCnt(self): return self._recCnt
+    
+    def report(self, showStats = True, showRecords = False):
+        s = self._emap.report(showStats, showRecords) + "\n" + self._tmap.report(showStats, showRecords)
+        #TODO - print size analysis here
+        return s
+    
+    def run(self):
+        if self._csvStartCb != None and self._csvStartCb(self._csvStoreChk): return
+        if self._jra.isEmpty(): return
+        stop = self._advanceJrnlFile(*self._jra.getOldestFile())
+        while not stop and not self._getNextRecord(): pass
+        if self._csvEndCb != None and self._csvEndCb(self._csvStoreChk): return
+        if not self._qflag: print
+    
+    def setCallbacks(self, csvStoreChk, csvStartCb = None, csvEnqCb = None, csvDeqCb = None, csvTxnCb = None, csvEndCb = None):
+        self._csvStoreChk = csvStoreChk
+        self._csvStartCb = csvStartCb
+        self._csvEnqCb = csvEnqCb
+        self._csvDeqCb = csvDeqCb
+        self._csvTxnCb = csvTxnCb
+        self._csvEndCb = csvEndCb
+    
+    def tmap(self): return self._tmap
+    
+    def txnMsgCnt(self): return self._txnMsgCnt
+    
+    def txnObjList(self): return self._txnObjList
+    
+    def _advanceJrnlFile(self, *oldestFileInfo):
+        froSeekFlag = False
+        if len(oldestFileInfo) > 0:
+            self._startFileNum = self._fileNum = oldestFileInfo[0]
+            self._fro = oldestFileInfo[4]
+            froSeekFlag = True # jump to fro to start reading
+            if not self._qflag and not self._rflag:
+                if self._vflag: print "Recovering journals..."
+                else: print "Recovering journals",
+        if self._file != None and self._fileFull():
+            self._file.close()
+            self._fileNum = self._incrFileNum()
+            if self._fileNum == self._startFileNum:
+                return True
+            if self._startFileNum == 0:
+                self._lastFileFlag = self._fileNum == self._ji.getNumJrnlFiles() - 1
+            else:
+                self._lastFileFlag = self._fileNum == self._startFileNum - 1
+        if self._fileNum < 0 or self._fileNum >= self._ji.getNumJrnlFiles():
+            raise Exception("Bad file number %d" % self._fileNum)
+        jfn = os.path.join(self._ji.getCurrentJnrlDir(), "%s.%04x.jdat" % (self._ji.getJrnlBaseName(), self._fileNum))
+        self._file = open(jfn)
+        self._fileHdr = Utils.load(self._file, Hdr)
+        if froSeekFlag and self._file.tell() != self._fro:
+            self._file.seek(self._fro)
+        self._firstRecFlag = True
+        if not self._qflag:
+            if self._rflag: print jfn, ": ", self._fileHdr
+            elif self._vflag: print "* Reading %s" % jfn
+            else:
+                print ".",
+                sys.stdout.flush()
+        return False
+
+    def _checkOwi(self, hdr):
+        return self._fileHdrOwi == hdr.owi()
+    
+    def _fileFull(self):
+        return self._file.tell() >= self._ji.getJrnlFileSizeBytes()
+    
+    def _getNextRecord(self, *oldestFileInfo):
+        if self._fileFull():
+            if self._advanceJrnlFile(): return True
+        try: hdr = Utils.load(self._file, Hdr)
+        except: return True
+        if hdr.empty(): return True
+        if hdr.check(): return True
+        self._recCnt += 1
+        self._fileHdrOwi = self._fileHdr.owi()
+        if self._firstRecFlag:
+            if self._fileHdr.fro != hdr.foffs:
+                raise Exception("File header first record offset mismatch: fro=0x%x; rec_offs=0x%x" % (self._fileHdr.fro, hdr.foffs))
+            else:
+                if self._rflag: print " * fro ok: 0x%x" % self._fileHdr.fro
+                self._firstRecFlag = False
+        stop = False
+        if   isinstance(hdr, EnqRec):
+            stop = self._handleEnqRec(hdr)
+        elif isinstance(hdr, DeqRec):
+            stop = self._handleDeqRec(hdr)
+        elif isinstance(hdr, TxnRec):
+            stop = self._handleTxnRec(hdr)
+        wstr = ""
+        for w in self._warning:
+            wstr += " (%s)" % w
+        if self._rflag: print " > %s  %s" % (hdr, wstr)
+        self._warning = []
+        return stop
+     
+    def _handleDeqRec(self, hdr):
+        if self._loadRec(hdr): return True
+        
+        # Check OWI flag
+        if not self._checkOwi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csvDeqCb != None and self._csvDeqCb(self._csvStoreChk, hdr): return True
+        
+        try:
+            if hdr.xid == None:
+                self._emap.delete(hdr.deq_rid)
+            else:
+                self._tmap.add(self._fileHdr.fid, hdr)
+        except Warning, w: self._warning.append(str(w))
+        return False
+    
+    def _handleEnqRec(self, hdr):
+        if self._loadRec(hdr): return True
+        
+        # Check extern flag
+        if hdr.extern and hdr.data != None: raise Exception("Message data found on external record: hdr=%s" % hdr)
+        # Check OWI flag
+        if not self._checkOwi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csvEnqCb != None and self._csvEnqCb(self._csvStoreChk, hdr): return True
+        
+        if hdr.xid == None:
+            self._emap.add(self._fileHdr.fid, hdr)
+        else:
+            self._txnMsgCnt += 1
+            self._tmap.add(self._fileHdr.fid, hdr)
+        self._msgCnt += 1
+        return False
+    
+    def _handleTxnRec(self, hdr):
+        if self._loadRec(hdr): return True
+        
+        # Check OWI flag
+        if not self._checkOwi(hdr):
+            self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+            return True
+        # Test hook
+        if self._csvTxnCb != None and self._csvTxnCb(self._csvStoreChk, hdr): return True
+               
+        if hdr.magic[-1] == "a": self._abortCnt += 1
+        else: self._commitCnt += 1
+        
+        if self._tmap.contains(hdr.xid):
+            mismatchedRids = self._tmap.delete(hdr)
+            if mismatchedRids != None and len(mismatchedRids) > 0:
+                self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % mismatchedRids)
+        else:
+            self._warning.append("WARNING: %s not found in transaction map" % Utils.formatXid(hdr.xid))
+        if hdr.magic[-1] == "c": # commits only
+            self._txnObjList[hdr.xid] = hdr
+        return False
+
+    def _incrFileNum(self):
+        self._fileNum += 1
+        if self._fileNum >= self._ji.getNumJrnlFiles():
+            self._fileNum = 0;
+        return self._fileNum
+    
+    def _loadRec(self, hdr):
+        while not hdr.complete():
+            if self._advanceJrnlFile(): return True
+            hdr.load(self._file)
+        return False
+        
+
+#==============================================================================
+
+_CLASSES = {
+    "a": TxnRec,
+    "c": TxnRec,
+    "d": DeqRec,
+    "e": EnqRec,
+    "f": FileHdr
+}
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."

Added: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize	                        (rev 0)
+++ store/trunk/cpp/tools/resize	2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,306 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import jrnl
+import glob, optparse, os, sys, time
+
+class Resize(object):
+    """
+    Creates a new store journal and copies records from old journal to new. The new journal may be of
+    different size from the old one. The records are packed into the new journal (ie only remaining
+    enqueued records and associated transactions - if any - are copied over without spaces between them).
+    
+    The default action is to push the old journal down into a 'bak' sub-directory and then create a
+    new journal of the same size and pack it with the records from the old. However, it is possible to
+    suppress the pushdown (using --no-pushdown), in which case either a new journal id (using
+    --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former
+    case,a new journal will be created using the new base file name alongside the old one. In the latter
+    case, the old journal will be renamed to the supplied name, and the new one will take the default.
+    Note that both can be specified together with the --no-pushdown option.
+    
+    To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These
+    should be large enough to write all the records or an error will result. If the size is large enough
+    to write all records, but too small to keep below the enqueue threshold, a warning will be printed.
+    Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently
+    big to accept the transferred records.
+    """
+    
+    BAK_DIR = "bak"
+    JFILE_SIZE_PGS_MIN = 1
+    JFILE_SIZE_PGS_MAX = 32768
+    NUM_JFILES_MIN = 4
+    NUM_JFILES_MAX = 64
+    
+    def __init__(self, args):
+        self._opts = None
+        self._jdir = None
+        self._fName = None
+        self._fNum = None
+        self._file = None
+        self._fileRecWrCnt = None
+        self._fillerWrCnt = None
+        self._lastRecFid = None
+        self._lastRecOffs = None
+        self._recWrCnt = None
+        
+        self._jrnlInfo = None
+        self._jrnlAnal = None
+        self._jrnlRdr = None
+        
+        self._processArgs(args)
+        self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+        # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+        jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
+        self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
+        self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag, self._opts.rflag, self._opts.vflag)
+    
+    def run(self):
+        if not self._opts.qflag: print self._jrnlAnal
+        self._jrnlRdr.run()
+        if self._opts.vflag: print self._jrnlInfo
+        if not self._opts.qflag: print self._jrnlRdr.report(self._opts.vflag, self._opts.rflag)
+        self._handleOldFiles()
+        self._createNewFiles()
+        if not self._opts.qflag: print "Transferred %d records to new journal." % self._recWrCnt
+        self._chkFree()
+    
+    def _chkFree(self):
+        if self._lastRecFid == None or self._lastRecOffs == None: return
+        wrCapacityBytes = self._lastRecFid * self._jrnlInfo.getJrnlSizeBytes() + self._lastRecOffs
+        totCapacityBytes = self._jrnlInfo.getTotalJrnlFileCapacityBytes()
+        percentFull = 100.0 * wrCapacityBytes / totCapacityBytes
+        if percentFull > 80.0:
+            raise jrnl.Warning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records until some existing records are dequeued." % (self._jrnlInfo.getJrnlId(), percentFull))
+    
+    def _createNewFiles(self):
+        # Assemble records to be transfered
+        masterRecordList = {}
+        txnRecordList = self._jrnlRdr.txnObjList()
+        if self._opts.vflag and self._jrnlRdr.emap().size() > 0:
+            print "* Assembling %d records from emap" % self._jrnlRdr.emap().size()
+        for t in self._jrnlRdr.emap().getRecList():
+            hdr = t[1]
+            hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+            masterRecordList[long(hdr.rid)] = hdr
+            if hdr.xidsize > 0 and hdr.xid in txnRecordList.keys():
+                txnHdr = txnRecordList[hdr.xid]
+                del(txnRecordList[hdr.xid])
+                txnHdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+                masterRecordList[long(txnHdr.rid)] = txnHdr
+        if self._opts.vflag and self._jrnlRdr.tmap().size() > 0:
+            print "* Assembling %d records from tmap" % self._jrnlRdr.tmap().size()
+        for x in self._jrnlRdr.tmap().xids():
+            for t in self._jrnlRdr.tmap().get(x):
+                hdr = t[1]
+                hdr.flags &= ~jrnl.Hdr.owi_mask # Turn off owi
+                masterRecordList[hdr.rid] = hdr
+        ridList = masterRecordList.keys()
+        ridList.sort()
+            
+        # get base filename
+        bfn = self._opts.bfn
+        if self._opts.nbfn != None:
+            bfn = self._opts.nbfn
+        
+        # write jinf file
+        self._jrnlInfo.resize(self._opts.njf, self._opts.jfs)
+        self._jrnlInfo.write(self._jdir, bfn)
+        
+        # write records
+        if self._opts.vflag: print "* Transferring records to new journal files"
+        fro = jrnl.sblkSize
+        while len(ridList) > 0:
+            hdr = masterRecordList[ridList.pop(0)]
+            rec = hdr.encode()
+            pos = 0
+            while pos < len(rec):
+                if self._file == None or self._file.tell() >= self._jrnlInfo.getJrnlFileSizeBytes():
+                    if self._file == None: rid = hdr.rid
+                    elif len(ridList) == 0: rid = None
+                    else: rid = ridList[0]
+                    if not self._rotateFile(rid, fro):
+                        raise Exception("ERROR: Ran out of journal space while writing records.")
+                if len(rec) - pos <= self._jrnlInfo.getJrnlFileSizeBytes() - self._file.tell():
+                    self._file.write(rec[pos:])
+                    self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(), jrnl.dblkSize))
+                    pos = len(rec)
+                    fro = jrnl.sblkSize
+                else:
+                    l = self._jrnlInfo.getJrnlFileSizeBytes() - self._file.tell()
+                    self._file.write(rec[pos:pos+l])
+                    pos += l
+                    rem = len(rec) - pos
+                    if rem <= self._jrnlInfo.getJrnlSizeBytes():
+                        fro = (jrnl.Utils.sizeInBytesToBlk(jrnl.sblkSize + rem, jrnl.dblkSize))
+                    else:
+                        fro = 0
+            self._recWrCnt += 1
+            self._fileRecWrCnt += 1
+        self._fillFile(addFillerRecs = True)
+        while self._rotateFile(): pass
+        
+    def _fillFile(self, toPosn = None, addFillerRecs = False):
+        if self._file == None: return
+        if addFillerRecs:
+            nfr = int(jrnl.Utils.remBytesInBlk(self._file, jrnl.sblkSize) / jrnl.dblkSize)
+            if nfr > 0:
+                self._fillerWrCnt = nfr
+                for i in range(0, nfr):
+                    self._file.write("RHMx")
+                    self._fillFile(jrnl.Utils.sizeInBytesToBlk(self._file.tell(), jrnl.dblkSize))
+            self._lastRecFid = self._fNum
+            self._lastRecOffs = self._file.tell()
+        if toPosn == None: toPosn = self._jrnlInfo.getJrnlFileSizeBytes()
+        elif toPosn > self._jrnlInfo.getJrnlFileSizeBytes(): raise Exception("Filling to size > max file size")
+        diff = toPosn - self._file.tell()
+        self._file.write(str("\0" * diff))
+        #DEBUG
+        if self._file.tell() != toPosn: raise Exception("OOPS - File size problem")
+        
+    def _rotateFile(self, rid = None, fro = None):
+        if self._file != None:
+            self._file.close()
+            if self._opts.vflag:
+                if self._fileRecWrCnt == 0:
+                    print "  (empty)"
+                elif self._fillerWrCnt == None:
+                    print "  (%d records)" % self._fileRecWrCnt
+                else:
+                    print "  (%d records + %d filler(s))" % (self._fileRecWrCnt, self._fillerWrCnt)
+        if self._fNum == None:
+            self._fNum = 0
+            self._recWrCnt = 0
+        elif self._fNum == self._jrnlInfo.getNumJrnlFiles() - 1: return False
+        else: self._fNum += 1
+        self._fileRecWrCnt = 0
+        self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04d.jdat" % (self._jrnlInfo.getJrnlBaseName(), self._fNum))
+        if self._opts.vflag: print "* Opening file %s" % self._fName,
+        self._file = open(self._fName, "w")
+        if rid == None or fro == None:
+            self._fillFile()
+        else:
+            t = time.time()
+            fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.hdrVer, int(jrnl.Hdr.big_endian_flag), 0, rid)
+            fhdr.init(self._file, 0, self._fNum, self._fNum, fro, int(t), 1000000000*(t - int(t)))
+            self._file.write(fhdr.encode())
+            self._fillFile(jrnl.sblkSize)
+        return True
+    
+    def _handleOldFiles(self):
+        targetDir = self._jdir
+        if not self._opts.npd:
+            targetDir = os.path.join(self._jdir, self.BAK_DIR)
+            if os.path.exists(targetDir):
+                if self._opts.vflag: print "* Pushdown directory %s exists, deleting content" % targetDir
+                for f in glob.glob(os.path.join(targetDir, "*")):
+                    os.unlink(f)
+            else:
+                if self._opts.vflag: print "* Creating new pushdown directory %s" % targetDir
+                os.mkdir(targetDir)
+        
+        if not self._opts.npd or self._opts.obfn != None:
+            if self._opts.obfn != None and self._opts.vflag: print "* Renaming old journal files using base name %s" % self._opts.obfn
+            # .jdat files
+            for fn in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
+                tbfn = os.path.basename(fn)
+                if self._opts.obfn != None:
+                    i1 = tbfn.rfind(".")
+                    if i1 >= 0:
+                        i2 = tbfn.rfind(".", 0, i1)
+                        if i2 >= 0:
+                            tbfn = "%s%s" % (self._opts.obfn, tbfn[i2:])
+                os.rename(fn, os.path.join(targetDir, tbfn))
+            # .jinf file
+            self._jrnlInfo.write(targetDir, self._opts.obfn)
+            os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
+
+    def _printOptions(self):
+        if self._opts.vflag:
+            print "Journal dir: %s" % self._jdir
+            print "Options: Base filename: %s" % self._opts.bfn
+            print "         New base filename: %s" % self._opts.nbfn
+            print "         Old base filename: %s" % self._opts.obfn
+            print "         Pushdown: %s" % self._opts.npd
+            print "         No. journal files: %d" % self._opts.njf
+            print "         Journal file size: %d 64kiB blocks" % self._opts.jfs
+            print "         Show records flag: %s" % self._opts.rflag
+            print "         Verbose flag: %s" % True
+            print
+    
+    def _processArgs(self, argv):
+        op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        op.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        op.add_option("-B", "--new-base-filename",
+                      action="store", dest="nbfn",
+                      help="Base filename for new journal files")
+        op.add_option("-n", "--no-pushdown",
+                      action="store_true", dest="npd",
+                      help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
+        op.add_option("-N", "--num-jfiles",
+                      action="store", type="int", dest="njf", default=8,
+                      help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+        op.add_option("-o", "--old-base-filename",
+                      action="store", dest="obfn",
+                      help="Base filename for old journal files")
+        op.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        op.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print remaining records and transactions")
+        op.add_option("-s", "--jfile-size-pgs",
+                      action="store", type="int", dest="jfs", default=24,
+                      help="Size of each new journal file in 64kiB blocks (%d-%d)" % (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+        op.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self._opts, args) = op.parse_args()
+        if len(args) == 0:
+            op.error("No journal directory argument")
+        elif len(args) > 1:
+            op.error("Too many positional arguments: %s" % args)
+        if self._opts.qflag and self._opts.rflag:
+            op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self._opts.qflag and self._opts.vflag:
+            op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
+            op.error("Number of files (%d) is out of range (%d-%d)" % (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+        if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+            op.error("File size (%d) is out of range (%d-%d)" % (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+        if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
+            op.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and (-o/--old-base-filename) must be used.")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            op.error("Journal path \"%s\" does not exist" % self._jdir)
+        self._printOptions()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+    r = Resize(sys.argv);
+    try: r.run()
+    except Exception, e: sys.exit(e)


Property changes on: store/trunk/cpp/tools/resize
___________________________________________________________________
Name: svn:executable
   + *

Added: store/trunk/cpp/tools/store_chk
===================================================================
--- store/trunk/cpp/tools/store_chk	                        (rev 0)
+++ store/trunk/cpp/tools/store_chk	2009-11-25 20:45:30 UTC (rev 3726)
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+import jrnl
+import optparse, os, sys, xml.parsers.expat
+
+#== class Main ================================================================
+
+class Main(object):
+    """
+    This class:
+     1. Reads a journal jinf file, and from its info:
+     2. Analyzes the journal data files to determine which is the last to be written, then
+     3. Reads and analyzes all the records in the journal files.
+     The only public method is run() which kicks off the analysis.
+    """
+    
+    def __init__(self, args):
+        # params
+        self._opts = None
+        self._jdir = None
+        
+        # recovery analysis objects
+        self._jrnlInfo = None
+        self._jrnlRdr = None
+        
+        self._processArgs(args)
+        self._jrnlInfo = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+        # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+        jrnl.jfsize = self._jrnlInfo.getJrnlFileSizeBytes()
+        self._jrnlAnal = jrnl.JrnlAnalyzer(self._jrnlInfo)
+        self._jrnlRdr = jrnl.JrnlReader(self._jrnlInfo, self._jrnlAnal, self._opts.qflag, self._opts.rflag, self._opts.vflag)
+
+    def run(self):
+        if not self._opts.qflag:
+            print self._jrnlInfo
+            print self._jrnlAnal
+        self._jrnlRdr.run()
+        self._report()
+        
+    def _report(self):
+        if not self._opts.qflag:
+            print
+            print " === REPORT ===="
+            print
+            print "Records:      %8d non-transactional" % (self._jrnlRdr.msgCnt() - self._jrnlRdr.txnMsgCnt())
+            print "              %8d transactional" % self._jrnlRdr.txnMsgCnt()
+            print "              %8d total" % self._jrnlRdr.msgCnt()
+            print
+            print "Transactions: %8d aborts" % self._jrnlRdr.abortCnt()
+            print "              %8d commits" % self._jrnlRdr.commitCnt()
+            print "              %8d total" % (self._jrnlRdr.abortCnt() + self._jrnlRdr.commitCnt())
+            print
+            if self._jrnlRdr.emap().size() > 0:
+                print "Remaining enqueued records (sorted by rid): "
+                for rid in sorted(self._jrnlRdr.emap().rids()):
+                    tup = self._jrnlRdr.emap().get(rid)
+                    locked = ""
+                    if tup[2]:
+                        locked += " (locked)"
+                    print "  fid=%d %s%s" % (tup[0], tup[1], locked)
+                print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self._jrnlRdr.emap().size()
+            else:
+                print "No remaining enqueued records found (emap empty)."
+            print
+            if self._jrnlRdr.tmap().size() > 0:
+                txnRecCnt = 0
+                print "Incomplete transactions: "
+                for xid in self._jrnlRdr.tmap().xids():
+                    jrnl.Utils.formatXid(xid)
+                    recs = self._jrnlRdr.tmap().get(xid)
+                    for tup in recs:
+                        print "  fid=%d %s" % (tup[0], tup[1])
+                    print " Total: %d records for %s" % (len(recs), jrnl.Utils.formatXid(xid))
+                    print
+                    txnRecCnt += len(recs)
+                print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % (self._jrnlRdr.tmap().size(), txnRecCnt)
+            else:
+                print "No incomplete transactions found (tmap empty)."
+            print
+            print "%d enqueues, %d journal records processed." % (self._jrnlRdr.msgCnt(), self._jrnlRdr.recCnt())
+    
+        
+    def _processArgs(self, argv):
+        op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        op.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        op.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        op.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print remaining records and transactions")
+        op.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self._opts, args) = op.parse_args()
+        if len(args) == 0:
+            op.error("No journal directory argument")
+        elif len(args) > 1:
+            op.error("Too many positional arguments: %s" % args)
+        if self._opts.qflag and self._opts.rflag:
+            op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self._opts.qflag and self._opts.vflag:
+            op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            op.error("Journal path \"%s\" does not exist" % self._jdir)
+
+
+#== class CsvMain =============================================================
+
+class CsvMain(Main):
+    """
+    This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
+    record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
+    currently in use in the long store tests for journal encode testing.
+    """
+    
+    # CSV file cols
+    TEST_NUM_COL = 0
+    NUM_MSGS_COL = 5
+    MIN_MSG_SIZE_COL = 7
+    MAX_MSG_SIZE_COL = 8
+    MIN_XID_SIZE_COL = 9
+    MAX_XID_SIZE_COL = 10
+    AUTO_DEQ_COL = 11
+    TRANSIENT_COL = 12
+    EXTERN_COL = 13
+    COMMENT_COL = 20
+    
+    def __init__(self, args):
+        # csv params
+        self._numMsgs = None
+        self._msgLen = None
+        self._autoDeq = None
+        self._xidLen = None
+        self._transient = None
+        self._extern = None
+        
+        self._warning = []
+        
+        Main.__init__(self, args)
+        self._jrnlRdr.setCallbacks(self, CsvMain._csvPreRunChk, CsvMain._csvEnqChk, CsvMain._csvDeqChk, CsvMain._csvTxnChk, CsvMain._csvPostRunChk)
+        self._getCsvTest()
+
+    def _getCsvTest(self):
+        if self._opts.csvfn != None and self._opts.tnum != None:
+            tparams = self._readCsvFile(self._opts.csvfn, self._opts.tnum)
+            if tparams == None:
+                print "ERROR: Test %d not found in CSV file \"%s\"" % (self._opts.tnum, self._opts.csvfn)
+                sys.exit(1)
+            self._numMsgs = tparams["num_msgs"]
+            if tparams["min_size"] == tparams["max_size"]:
+                self._msgLen = tparams["max_size"]
+            else:
+                self._msgLen = 0
+            self._autoDeq = tparams["auto_deq"]
+            if tparams["xid_min_size"] == tparams["xid_max_size"]:
+                self._xidLen = tparams["xid_max_size"]
+            else:
+                self._xidLen = 0
+            self._transient = tparams["transient"]
+            self._extern = tparams["extern"]
+
+    def _readCsvFile(self, filename, tnum):
+        try:
+            f=open(filename, "r")
+        except IOError:
+            print "ERROR: Unable to open CSV file \"%s\"" % filename
+            sys.exit(1)
+        for l in f:
+            sl = l.strip().split(",")
+            if len(sl[0]) > 0 and sl[0][0] != "\"":
+                try:
+                    if (int(sl[self.TEST_NUM_COL]) == tnum):
+                        return { "num_msgs":int(sl[self.NUM_MSGS_COL]),
+                                 "min_size":int(sl[self.MIN_MSG_SIZE_COL]),
+                                 "max_size":int(sl[self.MAX_MSG_SIZE_COL]),
+                                 "auto_deq":not (sl[self.AUTO_DEQ_COL] == "FALSE" or sl[self.AUTO_DEQ_COL] == "0"),
+                                 "xid_min_size":int(sl[self.MIN_XID_SIZE_COL]),
+                                 "xid_max_size":int(sl[self.MAX_XID_SIZE_COL]),
+                                 "transient":not (sl[self.TRANSIENT_COL] == "FALSE" or sl[self.TRANSIENT_COL] == "0"),
+                                 "extern":not (sl[self.EXTERN_COL] == "FALSE" or sl[self.EXTERN_COL] == "0"),
+                                 "comment":sl[self.COMMENT_COL] }
+                except Exception:
+                    pass
+        return None
+        
+    def _processArgs(self, argv):
+        op = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        op.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        op.add_option("-c", "--csv-filename",
+                      action="store", dest="csvfn",
+                      help="CSV filename containing test parameters")
+        op.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        op.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print remaining records and transactions")
+        op.add_option("-t", "--test-num",
+                      action="store", type="int", dest="tnum",
+                      help="Test number from CSV file - only valid if CSV file named")
+        op.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self._opts, args) = op.parse_args()
+        if len(args) == 0:
+            op.error("No journal directory argument")
+        elif len(args) > 1:
+            op.error("Too many positional arguments: %s" % args)
+        if self._opts.qflag and self._opts.rflag:
+            op.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self._opts.qflag and self._opts.vflag:
+            op.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            op.error("Journal path \"%s\" does not exist" % self._jdir)
+        
+    # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
+    
+    @staticmethod
+    def _csvPreRunChk(csvStoreChk):
+        if csvStoreChk._numMsgs == None: return
+        if csvStoreChk._jrnlAnal.isEmpty() and csvStoreChk._numMsgs > 0:
+            raise Exception("[CSV %d] All journal files are empty, but test expects %d msg(s)." % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs))
+        return False
+    
+    @staticmethod
+    def _csvEnqChk(csvStoreChk, hdr):
+        #if csvStoreChk._numMsgs == None: return
+        # 
+        if csvStoreChk._extern != None:
+            if csvStoreChk._extern != hdr.extern:
+                raise Exception("[CSV %d] External flag mismatch: found extern=%s; expected %s" % (csvStoreChk._opts.tnum, hdr.extern, csvStoreChk._extern))
+            if hdr.extern and hdr.data != None:
+                raise Exception("[CSV %d] Message data found on record with external flag set" % csvStoreChk._opts.tnum)
+        if csvStoreChk._msgLen != None and csvStoreChk._msgLen > 0 and hdr.data != None and len(hdr.data) != csvStoreChk._msgLen:
+            raise Exception("[CSV %d] Message length mismatch: found %d; expected %d" % (csvStoreChk._opts.tnum, len(hdr.data), csvStoreChk._msgLen))
+        if csvStoreChk._xidLen != None and csvStoreChk._xidLen > 0 and len(hdr.xid) != csvStoreChk._xidLen:
+            raise Exception("[CSV %d] Message XID mismatch: found %d; expected %d" % (csvStoreChk._opts.tnum, len(hdr.xid), csvStoreChk._xidLen))
+        if csvStoreChk._transient != None and hdr.transient != csvStoreChk._transient:
+            raise Exception("[CSV %d] Transience mismatch: found trans=%s; expected %s" % (csvStoreChk._opts.tnum, hdr.transient, csvStoreChk._transient))
+        return False
+    
+    @staticmethod
+    def _csvDeqChk(csvStoreChk, hdr):
+        if csvStoreChk._autoDeq != None and not csvStoreChk._autoDeq:
+            self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % (csvStoreChk._opts.tnum, hdr.rid))
+        return False
+    
+    @staticmethod
+    def _csvTxnChk(csvStoreChk, hdr):
+        return False
+
+    @staticmethod
+    def _csvPostRunChk(csvStoreChk):
+        # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because of journal overwriting
+        if csvStoreChk._numMsgs != None and not csvStoreChk._jrnlRdr._lastFileFlag and csvStoreChk._numMsgs != csvStoreChk._jrnlRdr.msgCnt():
+            raise Exception("[CSV %s] Incorrect number of messages: Expected %d, found %d" % (csvStoreChk._opts.tnum, csvStoreChk._numMsgs, csvStoreChk._jrnlRdr.msgCnt()))
+        return False
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+    m = CsvMain(sys.argv)
+    try: m.run()
+    except Exception, e: sys.exit(e)


Property changes on: store/trunk/cpp/tools/store_chk
___________________________________________________________________
Name: svn:executable
   + *



More information about the rhmessaging-commits mailing list