rhmessaging commits: r4463 - store/branches/java/0.5.x-dev/etc.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2011-05-27 06:36:23 -0400 (Fri, 27 May 2011)
New Revision: 4463
Modified:
store/branches/java/0.5.x-dev/etc/config-systests-bdb-settings.xml
Log:
update bdb systest settings file with SSL configuration
Applied patch from Andrew Macbean
Modified: store/branches/java/0.5.x-dev/etc/config-systests-bdb-settings.xml
===================================================================
--- store/branches/java/0.5.x-dev/etc/config-systests-bdb-settings.xml 2011-05-26 13:56:21 UTC (rev 4462)
+++ store/branches/java/0.5.x-dev/etc/config-systests-bdb-settings.xml 2011-05-27 10:36:23 UTC (rev 4463)
@@ -20,6 +20,14 @@
-
-->
<broker>
+ <connector>
+ <ssl>
+ <port>15671</port>
+ <keystorePath>test-profiles/test_resources/ssl/keystore.jks</keystorePath>
+ <keystorePassword>password</keystorePassword>
+ </ssl>
+ </connector>
+
<virtualhosts>${QPID_HOME}/etc/virtualhosts-systests-bdb.xml</virtualhosts>
</broker>
13 years, 7 months
rhmessaging commits: r4462 - mgmt/trunk/sesame/cpp.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2011-05-26 09:56:21 -0400 (Thu, 26 May 2011)
New Revision: 4462
Modified:
mgmt/trunk/sesame/cpp/configure.ac
Log:
add detection of db4 4.8
Modified: mgmt/trunk/sesame/cpp/configure.ac
===================================================================
--- mgmt/trunk/sesame/cpp/configure.ac 2011-05-26 10:23:44 UTC (rev 4461)
+++ mgmt/trunk/sesame/cpp/configure.ac 2011-05-26 13:56:21 UTC (rev 4462)
@@ -136,10 +136,10 @@
AC_MSG_ERROR([db4-devel package missing. Please ensure both db4 and db4-devel are installed. (hint: "yum install db4-devel" should do it...)])
gl_saved_libs=$LIBS
-AC_SEARCH_LIBS([__db_open], [db_cxx-4.7 db_cxx-4.6 db_cxx-4.5 db_cxx-4.4 db_cxx-4.3 db_cxx-4.2],
+AC_SEARCH_LIBS([__db_open], [db_cxx-4.8 db_cxx-4.7 db_cxx-4.6 db_cxx-4.5 db_cxx-4.4 db_cxx-4.3 db_cxx-4.2],
[test "$ac_cv_search___db_open" = "none required" ||
LIB_BERKELEY_DB=$ac_cv_search___db_open],
- AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.2 through db_cxx-4.6]))
+ AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.2 through db_cxx-4.8]))
AC_SUBST([LIB_BERKELEY_DB])
LIBS=$gl_saved_libs
13 years, 7 months
rhmessaging commits: r4461 - store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2011-05-26 06:23:44 -0400 (Thu, 26 May 2011)
New Revision: 4461
Modified:
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Fix incorrect log message for delivery record and exchange record counters
Applied patch from Oleksandr Rudyy
Modified: store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-19 16:11:13 UTC (rev 4460)
+++ store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-26 10:23:44 UTC (rev 4461)
@@ -470,7 +470,7 @@
//Perform the visit
_oldMessageStore.visitQueues(queueVisitor);
- logCount(queueVisitor.getVisitedCount(), "queue");
+ logCount(queueVisitor.getVisitedCount(), "Queue");
_logger.info("Delivery Records");
//Migrate _deliveryDb;
@@ -491,7 +491,7 @@
};
_oldMessageStore.visitDelivery(deliveryDBVisitor);
- logCount(queueVisitor.getVisitedCount(), "Delivery Record");
+ logCount(deliveryDBVisitor.getVisitedCount(), "Delivery Record");
_logger.info("Exchanges");
//Migrate _exchangeDb;
@@ -516,7 +516,7 @@
};
_oldMessageStore.visitExchanges(echangeDBVisitor);
- logCount(queueVisitor.getVisitedCount(), "Exchange");
+ logCount(echangeDBVisitor.getVisitedCount(), "Exchange");
_logger.info("QueueBindings");
@@ -543,7 +543,7 @@
};
_oldMessageStore.visitBindings(queueBindings);
- logCount(queueBindings.getVisitedCount(), "queue binding");
+ logCount(queueBindings.getVisitedCount(), "Queue Binding");
}
/**
13 years, 7 months
rhmessaging commits: r4460 - in store/branches/java/0.5.x-dev: src/tools/java/org/apache/qpid/server/util and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2011-05-19 12:11:13 -0400 (Thu, 19 May 2011)
New Revision: 4460
Modified:
store/branches/java/0.5.x-dev/build.xml
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
Log:
update to account for changes to main broker codebase
Applied patch from Oleksandr Rudyy <orudyy(a)gmail.com>
Modified: store/branches/java/0.5.x-dev/build.xml
===================================================================
--- store/branches/java/0.5.x-dev/build.xml 2011-05-13 13:49:52 UTC (rev 4459)
+++ store/branches/java/0.5.x-dev/build.xml 2011-05-19 16:11:13 UTC (rev 4460)
@@ -133,8 +133,8 @@
<sysproperty key="BDB_WORK" value="${qpid.work.dir}/bdbstore"/>
<sysproperty key="BDB_HOME" value="${project.root}"/>
<sysproperty key="test.excludes" value="false"/>
+ <sysproperty key="broker.config" value="${project.root}/etc/config.xml" />
-
<formatter type="plain"/>
<formatter type="xml"/>
Modified: store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
===================================================================
--- store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2011-05-13 13:49:52 UTC (rev 4459)
+++ store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2011-05-19 16:11:13 UTC (rev 4460)
@@ -68,7 +68,7 @@
_accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY);
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager();
_managedObjectRegistry = new NoopManagedObjectRegistry();
_virtualHostRegistry = new VirtualHostRegistry(this);
13 years, 7 months
rhmessaging commits: r4459 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2011-05-13 09:49:52 -0400 (Fri, 13 May 2011)
New Revision: 4459
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
Log:
Improved null persistence Id exception message
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2011-05-10 15:35:17 UTC (rev 4458)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2011-05-13 13:49:52 UTC (rev 4459)
@@ -1362,12 +1362,12 @@
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
+ if (queueId == 0) {
+ THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)");
+ }
if (messageId == 0) {
- THROW_STORE_EXCEPTION("Error dequeuing message, persistence id not set");
+ THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id.");
}
- if (queueId == 0) {
- THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
- }
TxnCtxt implicit;
TxnCtxt* txn = 0;
13 years, 7 months
rhmessaging commits: r4458 - in store/trunk/cpp/tools: qpidstore and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2011-05-10 11:35:17 -0400 (Tue, 10 May 2011)
New Revision: 4458
Added:
store/trunk/cpp/tools/qpidstore/
store/trunk/cpp/tools/qpidstore/__init__.py
store/trunk/cpp/tools/qpidstore/janal.py
store/trunk/cpp/tools/qpidstore/jerr.py
store/trunk/cpp/tools/qpidstore/jrnl.py
Removed:
store/trunk/cpp/tools/__init__.py
store/trunk/cpp/tools/janal.py
store/trunk/cpp/tools/jerr.py
store/trunk/cpp/tools/jrnl.py
Modified:
store/trunk/cpp/tools/Makefile.am
Log:
Fix the module structure of the python tools so it can be properly invoked from the tests.
Modified: store/trunk/cpp/tools/Makefile.am
===================================================================
--- store/trunk/cpp/tools/Makefile.am 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/Makefile.am 2011-05-10 15:35:17 UTC (rev 4458)
@@ -23,4 +23,8 @@
qpidexec_SCRIPTS = resize store_chk
pkgpyexec_qpiddir = $(pyexecdir)/qpidstore
-pkgpyexec_qpid_PYTHON = __init__.py jerr.py jrnl.py janal.py
+pkgpyexec_qpid_PYTHON = \
+ qpidstore/__init__.py \
+ qpidstore/jerr.py \
+ qpidstore/jrnl.py \
+ qpidstore/janal.py
Deleted: store/trunk/cpp/tools/__init__.py
===================================================================
--- store/trunk/cpp/tools/__init__.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/__init__.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,23 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
Deleted: store/trunk/cpp/tools/janal.py
===================================================================
--- store/trunk/cpp/tools/janal.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/janal.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,612 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-import jerr, jrnl
-import os.path, sys
-
-
-#== class EnqMap ==============================================================
-
-class EnqMap(object):
- """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock"""
-
- def __init__(self):
- """Constructor"""
- self.__map = {}
-
- def __str__(self):
- """Print the contents of the map"""
- return self.report(True, True)
-
- def add(self, fid, hdr, lock = False):
- """Add a new record into the map"""
- if hdr.rid in self.__map:
- raise jerr.DuplicateRidError(hdr.rid)
- self.__map[hdr.rid] = [fid, hdr, lock]
-
- def contains(self, rid):
- """Return True if the map contains the given rid"""
- return rid in self.__map
-
- def delete(self, rid):
- """Delete the rid and its associated data from the map"""
- if rid in self.__map:
- if self.get_lock(rid):
- raise jerr.DeleteLockedRecordError(rid)
- del self.__map[rid]
- else:
- raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
-
- def get(self, rid):
- """Return a list [fid, hdr, lock] for the given rid"""
- if self.contains(rid):
- return self.__map[rid]
- return None
-
- def get_fid(self, rid):
- """Return the fid for the given rid"""
- if self.contains(rid):
- return self.__map[rid][0]
- return None
-
- def get_hdr(self, rid):
- """Return the header record for the given rid"""
- if self.contains(rid):
- return self.__map[rid][1]
- return None
-
- def get_lock(self, rid):
- """Return the transaction lock value for the given rid"""
- if self.contains(rid):
- return self.__map[rid][2]
- return None
-
- def get_rec_list(self):
- """Return a list of tuples (fid, hdr, lock) for all entries in the map"""
- return self.__map.values()
-
- def lock(self, rid):
- """Set the transaction lock for a given rid to True"""
- if rid in self.__map:
- if not self.__map[rid][2]: # locked
- self.__map[rid][2] = True
- else:
- raise jerr.AlreadyLockedError(rid)
- else:
- raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
-
- def report(self, show_stats, show_records):
- """Return a string containing a text report for all records in the map"""
- if len(self.__map) == 0:
- return "No enqueued records found."
- rstr = "%d enqueued records found" % len(self.__map)
- if show_records:
- rstr += ":"
- rid_list = self.__map.keys()
- rid_list.sort()
- for rid in rid_list:
- if self.__map[rid][2]:
- lock_str = " [LOCKED]"
- else:
- lock_str = ""
- rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
- else:
- rstr += "."
- return rstr
-
- def rids(self):
- """Return a list of rids in the map"""
- return self.__map.keys()
-
- def size(self):
- """Return the number of entries in the map"""
- return len(self.__map)
-
- def unlock(self, rid):
- """Set the transaction lock for a given rid to False"""
- if rid in self.__map:
- if self.__map[rid][2]:
- self.__map[rid][2] = False
- else:
- raise jerr.NotLockedError(rid)
- else:
- raise jerr.NonExistentRecordError("unlock", rid)
-
-
-#== class TxnMap ==============================================================
-
-class TxnMap(object):
- """Transaction map, which maps xids to a list of outstanding actions"""
-
- def __init__(self, emap):
- """Constructor, requires an existing EnqMap instance"""
- self.__emap = emap
- self.__map = {}
-
- def __str__(self):
- """Print the contents of the map"""
- return self.report(True, True)
-
- def add(self, fid, hdr):
- """Add a new transactional record into the map"""
- if isinstance(hdr, jrnl.DeqRec):
- try:
- self.__emap.lock(hdr.deq_rid)
- except jerr.JWarning:
- # Not in emap, look for rid in tmap
- l = self.find_rid(hdr.deq_rid, hdr.xid)
- if l != None:
- if l[2]:
- raise jerr.AlreadyLockedError(hdr.deq_rid)
- l[2] = True
- if hdr.xid in self.__map:
- self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
- else:
- self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
-
- def contains(self, xid):
- """Return True if the xid exists in the map; False otherwise"""
- return xid in self.__map
-
- def delete(self, hdr):
- """Remove a transaction record from the map using either a commit or abort header"""
- if hdr.magic[-1] == "c":
- return self._commit(hdr.xid)
- if hdr.magic[-1] == "a":
- self._abort(hdr.xid)
- else:
- raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid)
-
- def find_rid(self, rid, xid_hint = None):
- """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first"""
- if xid_hint != None and self.contains(xid_hint):
- for l in self.__map[xid_hint]:
- if l[1].rid == rid:
- return l
- for xid in self.__map.iterkeys():
- if xid_hint == None or xid != xid_hint:
- for l in self.__map[xid]:
- if l[1].rid == rid:
- return l
-
- def get(self, xid):
- """Return a list of operations for the given xid"""
- if self.contains(xid):
- return self.__map[xid]
-
- def report(self, show_stats, show_records):
- """Return a string containing a text report for all records in the map"""
- if len(self.__map) == 0:
- return "No outstanding transactions found."
- rstr = "%d outstanding transactions found" % len(self.__map)
- if show_records:
- rstr += ":"
- for xid, tup in self.__map.iteritems():
- rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
- for i in tup:
- rstr += "\n %s" % str(i[1])
- else:
- rstr += "."
- return rstr
-
- def size(self):
- """Return the number of xids in the map"""
- return len(self.__map)
-
- def xids(self):
- """Return a list of xids in the map"""
- return self.__map.keys()
-
- def _abort(self, xid):
- """Perform an abort operation for the given xid record"""
- for fid, hdr in self.__map[xid]:
- if isinstance(hdr, jrnl.DeqRec):
- self.__emap.unlock(hdr.rid)
- del self.__map[xid]
-
- def _commit(self, xid):
- """Perform a commit operation for the given xid record"""
- mismatch_list = []
- for fid, hdr, lock in self.__map[xid]:
- if isinstance(hdr, jrnl.EnqRec):
- self.__emap.add(fid, hdr, lock) # Transfer enq to emap
- else:
- if self.__emap.contains(hdr.deq_rid):
- self.__emap.unlock(hdr.deq_rid)
- self.__emap.delete(hdr.deq_rid)
- else:
- mismatch_list.append("0x%x" % hdr.deq_rid)
- del self.__map[xid]
- return mismatch_list
-
-#== class JrnlAnalyzer ========================================================
-
-class JrnlAnalyzer(object):
- """
- This class analyzes a set of journal files and determines which is the last to be written
- (the newest file), and hence which should be the first to be read for recovery (the oldest
- file).
-
- The analysis is performed on construction; the contents of the JrnlInfo object passed provide
- the recovery details.
- """
-
- def __init__(self, jinf):
- """Constructor"""
- self.__oldest = None
- self.__jinf = jinf
- self.__flist = self._analyze()
-
- def __str__(self):
- """String representation of this JrnlAnalyzer instance, will print out results of analysis."""
- ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir()
- if self.is_empty():
- ostr += " <All journal files are empty>\n"
- else:
- for tup in self.__flist:
- tmp = " "
- if tup[0] == self.__oldest[0]:
- tmp = "*"
- ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2],
- tup[3], tup[4], tup[5])
- for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
- ostr += " %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i)
- return ostr
-
- # Analysis
-
- def get_oldest_file(self):
- """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal"""
- return self.__oldest
-
- def get_oldest_file_index(self):
- """Return the ordinal number of the oldest data file found in the journal"""
- if self.is_empty():
- return None
- return self.__oldest[0]
-
- def is_empty(self):
- """Return true if the analysis found that the journal file has never been written to"""
- return len(self.__flist) == 0
-
- def _analyze(self):
- """Perform the journal file analysis by reading and comparing the file headers of each journal data file"""
- owi_found = False
- flist = []
- for i in range(0, self.__jinf.get_num_jrnl_files()):
- jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i))
- fhandle = open(jfn)
- fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
- if fhdr.empty():
- break
- this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
- flist.append(this_tup)
- if i == 0:
- init_owi = fhdr.owi()
- self.__oldest = this_tup
- elif fhdr.owi() != init_owi and not owi_found:
- self.__oldest = this_tup
- owi_found = True
- return flist
-
-
-#== class JrnlReader ====================================================
-
-class JrnlReader(object):
- """
- This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
- object list (txn_obj_list) which are populated by reading the journals from the oldest
- to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
- objects supplied on construction provide the information used for the recovery.
-
- The analysis is performed on construction.
- """
-
- def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
- """Constructor, which reads all """
- self._jinfo = jinfo
- self._jra = jra
- self._qflag = qflag
- self._rflag = rflag
- self._vflag = vflag
-
- # test callback functions for CSV tests
- self._csv_store_chk = None
- self._csv_start_cb = None
- self._csv_enq_cb = None
- self._csv_deq_cb = None
- self._csv_txn_cb = None
- self._csv_end_cb = None
-
- self._emap = EnqMap()
- self._tmap = TxnMap(self._emap)
- self._txn_obj_list = {}
-
- self._file = None
- self._file_hdr = None
- self._file_num = None
- self._first_rec_flag = None
- self._fro = None
- self._last_file_flag = None
- self._start_file_num = None
- self._file_hdr_owi = None
- self._warning = []
-
- self._abort_cnt = 0
- self._commit_cnt = 0
- self._msg_cnt = 0
- self._rec_cnt = 0
- self._txn_msg_cnt = 0
-
- def __str__(self):
- """Print out all the undequeued records"""
- return self.report(True, self._rflag)
-
- def emap(self):
- """Get the enqueue map"""
- return self._emap
-
- def get_abort_cnt(self):
- """Get the cumulative number of transactional aborts found"""
- return self._abort_cnt
-
- def get_commit_cnt(self):
- """Get the cumulative number of transactional commits found"""
- return self._commit_cnt
-
- def get_msg_cnt(self):
- """Get the cumulative number of messages found"""
- return self._msg_cnt
-
- def get_rec_cnt(self):
- """Get the cumulative number of journal records (including fillers) found"""
- return self._rec_cnt
-
- def is_last_file(self):
- """Return True if the last file is being read"""
- return self._last_file_flag
-
- def report(self, show_stats = True, show_records = False):
- """Return a string containing a report on the file analysis"""
- rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records)
- #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold
- return rstr
-
- def run(self):
- """Perform the read of the journal"""
- if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
- return
- if self._jra.is_empty():
- return
- stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
- while not stop and not self._get_next_record():
- pass
- if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
- return
- if not self._qflag:
- print
-
- def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None,
- csv_end_cb = None):
- """Set callbacks for checks to be made at various points while reading the journal"""
- self._csv_store_chk = csv_store_chk
- self._csv_start_cb = csv_start_cb
- self._csv_enq_cb = csv_enq_cb
- self._csv_deq_cb = csv_deq_cb
- self._csv_txn_cb = csv_txn_cb
- self._csv_end_cb = csv_end_cb
-
- def tmap(self):
- """Return the transaction map"""
- return self._tmap
-
- def get_txn_msg_cnt(self):
- """Get the cumulative transactional message count"""
- return self._txn_msg_cnt
-
- def txn_obj_list(self):
- """Get a cumulative list of transaction objects (commits and aborts)"""
- return self._txn_obj_list
-
- def _advance_jrnl_file(self, *oldest_file_info):
- """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
- more files to read."""
- fro_seek_flag = False
- if len(oldest_file_info) > 0:
- self._start_file_num = self._file_num = oldest_file_info[0]
- self._fro = oldest_file_info[4]
- fro_seek_flag = True # jump to fro to start reading
- if not self._qflag and not self._rflag:
- if self._vflag:
- print "Recovering journals..."
- else:
- print "Recovering journals",
- if self._file != None and self._is_file_full():
- self._file.close()
- self._file_num = self._incr_file_num()
- if self._file_num == self._start_file_num:
- return True
- if self._start_file_num == 0:
- self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1
- else:
- self._last_file_flag = self._file_num == self._start_file_num - 1
- if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files():
- raise jerr.BadFileNumberError(self._file_num)
- jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
- (self._jinfo.get_jrnl_base_name(), self._file_num))
- self._file = open(jfn)
- self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
- if fro_seek_flag and self._file.tell() != self._fro:
- self._file.seek(self._fro)
- self._first_rec_flag = True
- if not self._qflag:
- if self._rflag:
- print jfn, ": ", self._file_hdr
- elif self._vflag:
- print "* Reading %s" % jfn
- else:
- print ".",
- sys.stdout.flush()
- return False
-
- def _check_owi(self, hdr):
- """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
- indicate whether the last record in a file has been read and now older records which have not yet been
- overwritten are now being read."""
- return self._file_hdr_owi == hdr.owi()
-
- def _is_file_full(self):
- """Return True if the current file is full (no more write space); false otherwise"""
- return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
-
- def _get_next_record(self):
- """Get the next record in the file for analysis"""
- if self._is_file_full():
- if self._advance_jrnl_file():
- return True
- try:
- hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
- except:
- return True
- if hdr.empty():
- return True
- if hdr.check():
- return True
- self._rec_cnt += 1
- self._file_hdr_owi = self._file_hdr.owi()
- if self._first_rec_flag:
- if self._file_hdr.fro != hdr.foffs:
- raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
- else:
- if self._rflag:
- print " * fro ok: 0x%x" % self._file_hdr.fro
- self._first_rec_flag = False
- stop = False
- if isinstance(hdr, jrnl.EnqRec):
- stop = self._handle_enq_rec(hdr)
- elif isinstance(hdr, jrnl.DeqRec):
- stop = self._handle_deq_rec(hdr)
- elif isinstance(hdr, jrnl.TxnRec):
- stop = self._handle_txn_rec(hdr)
- wstr = ""
- for warn in self._warning:
- wstr += " (%s)" % warn
- if self._rflag:
- print " > %s %s" % (hdr, wstr)
- self._warning = []
- return stop
-
- def _handle_deq_rec(self, hdr):
- """Process a dequeue ("RHMd") record"""
- if self._load_rec(hdr):
- return True
-
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
- return True
-
- try:
- if hdr.xid == None:
- self._emap.delete(hdr.deq_rid)
- else:
- self._tmap.add(self._file_hdr.fid, hdr)
- except jerr.JWarning, warn:
- self._warning.append(str(warn))
- return False
-
- def _handle_enq_rec(self, hdr):
- """Process a dequeue ("RHMe") record"""
- if self._load_rec(hdr):
- return True
-
- # Check extern flag
- if hdr.extern and hdr.data != None:
- raise jerr.ExternFlagDataError(hdr)
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
- return True
-
- if hdr.xid == None:
- self._emap.add(self._file_hdr.fid, hdr)
- else:
- self._txn_msg_cnt += 1
- self._tmap.add(self._file_hdr.fid, hdr)
- self._msg_cnt += 1
- return False
-
- def _handle_txn_rec(self, hdr):
- """Process a transaction ("RHMa or RHMc") record"""
- if self._load_rec(hdr):
- return True
-
- # Check OWI flag
- if not self._check_owi(hdr):
- self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
- return True
- # Test hook
- if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
- return True
-
- if hdr.magic[-1] == "a":
- self._abort_cnt += 1
- else:
- self._commit_cnt += 1
-
- if self._tmap.contains(hdr.xid):
- mismatched_rids = self._tmap.delete(hdr)
- if mismatched_rids != None and len(mismatched_rids) > 0:
- self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
- mismatched_rids)
- else:
- self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
- if hdr.magic[-1] == "c": # commits only
- self._txn_obj_list[hdr.xid] = hdr
- return False
-
- def _incr_file_num(self):
- """Increment the number of files read with wraparound (ie after file n-1, go to 0)"""
- self._file_num += 1
- if self._file_num >= self._jinfo.get_num_jrnl_files():
- self._file_num = 0
- return self._file_num
-
- def _load_rec(self, hdr):
- """Load a single record for the given header. There may be arbitrarily large xids and data components."""
- while not hdr.complete():
- if self._advance_jrnl_file():
- return True
- hdr.load(self._file)
- return False
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
Deleted: store/trunk/cpp/tools/jerr.py
===================================================================
--- store/trunk/cpp/tools/jerr.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/jerr.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,223 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-# == Warnings =================================================================
-
-class JWarning(Exception):
- """Class to convey a warning"""
- def __init__(self, err):
- """Constructor"""
- Exception.__init__(self, err)
-
-# == Errors ===================================================================
-
-class AllJrnlFilesEmptyCsvError(Exception):
- """All journal files are empty (never been written)"""
- def __init__(self, tnum, exp_num_msgs):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." %
- (tnum, exp_num_msgs))
-
-class AlreadyLockedError(Exception):
- """Error class for trying to lock a record that is already locked"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid)
-
-class BadFileNumberError(Exception):
- """Error class for incorrect or unexpected file number"""
- def __init__(self, file_num):
- """Constructor"""
- Exception.__init__(self, "Bad file number %d" % file_num)
-
-class DataSizeError(Exception):
- """Error class for data size mismatch"""
- def __init__(self, exp_size, act_size, data_str):
- """Constructor"""
- Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" %
- (exp_size, act_size, data_str))
-
-class DeleteLockedRecordError(Exception):
- """Error class for deleting a locked record from the enqueue map"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid)
-
-class DequeueNonExistentEnqueueError(Exception):
- """Error class for attempting to dequeue a non-existent enqueue record (rid)"""
- def __init__(self, deq_rid):
- """Constructor"""
- Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid)
-
-class DuplicateRidError(Exception):
- """Error class for placing duplicate rid into enqueue map"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid)
-
-class EndianMismatchError(Exception):
- """Error class mismatched record header endian flag"""
- def __init__(self, exp_endianness):
- """Constructor"""
- Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" %
- self.endian_str(exp_endianness))
- #@staticmethod
- def endian_str(endianness):
- """Return a string tuple for the endianness error message"""
- if endianness:
- return "big", "little"
- return "little", "big"
- endian_str = staticmethod(endian_str)
-
-class ExternFlagDataError(Exception):
- """Error class for the extern flag being set and the internal size > 0"""
- def __init__(self, hdr):
- """Constructor"""
- Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr)
-
-class ExternFlagCsvError(Exception):
- """External flag mismatch between record and CSV test file"""
- def __init__(self, tnum, exp_extern_flag):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag))
-
-class ExternFlagWithDataCsvError(Exception):
- """External flag set and Message data found"""
- def __init__(self, tnum):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum)
-
-class FillExceedsFileSizeError(Exception):
- """Internal error from a fill operation which will exceed the specified file size"""
- def __init__(self, cur_size, file_size):
- """Constructor"""
- Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size))
-
-class FillSizeError(Exception):
- """Internal error from a fill operation that did not match the calculated end point in the file"""
- def __init__(self, cur_posn, exp_posn):
- """Constructor"""
- Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn))
-
-class FirstRecordOffsetMismatch(Exception):
- """Error class for file header fro mismatch with actual record"""
- def __init__(self, fro, actual_offs):
- """Constructor"""
- Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" %
- (fro, actual_offs))
-
-class InvalidHeaderVersionError(Exception):
- """Error class for invalid record header version"""
- def __init__(self, exp_ver, act_ver):
- """Constructor"""
- Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver))
-
-class InvalidRecordTypeError(Exception):
- """Error class for any operation using an invalid record type"""
- def __init__(self, operation, magic, rid):
- """Constructor"""
- Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" %
- (operation, magic, rid))
-
-class InvalidRecordTailError(Exception):
- """Error class for invalid record tail"""
- def __init__(self, magic_err, rid_err, rec):
- """Constructor"""
- Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err)))
- #@staticmethod
- def tail_err_str(magic_err, rid_err):
- """Return a string indicating the tail record error(s)"""
- estr = ""
- if magic_err:
- estr = "magic bad"
- if rid_err:
- estr += ", "
- if rid_err:
- estr += "rid mismatch"
- return estr
- tail_err_str = staticmethod(tail_err_str)
-
-class NonExistentRecordError(Exception):
- """Error class for any operation on an non-existent record"""
- def __init__(self, operation, rid):
- """Constructor"""
- Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid))
-
-class NotLockedError(Exception):
- """Error class for unlocking a record which is not locked in the first place"""
- def __init__(self, rid):
- """Constructor"""
- Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid)
-
-class JournalSpaceExceededError(Exception):
- """Error class for when journal space of resized journal is too small to contain the transferred records"""
- def __init__(self):
- """Constructor"""
- Exception.__init__(self, "Ran out of journal space while writing records")
-
-class MessageLengthCsvError(Exception):
- """Message length mismatch between record and CSV test file"""
- def __init__(self, tnum, exp_msg_len, actual_msg_len):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" %
- (tnum, exp_msg_len, actual_msg_len))
-
-class NumMsgsCsvError(Exception):
- """Number of messages found mismatched with CSV file"""
- def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
- """Constructor"""
- Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" %
- (tnum, exp_num_msgs, actual_num_msgs))
-
-class TransactionCsvError(Exception):
- """Transaction mismatch between record and CSV file"""
- def __init__(self, tnum, exp_transactional):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional))
-
-class UnexpectedEndOfFileError(Exception):
- """Error class for unexpected end-of-file during reading"""
- def __init__(self, exp_size, curr_offs):
- """Constructor"""
- Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" %
- (exp_size, curr_offs))
-
-class XidLengthCsvError(Exception):
- """Message Xid length mismatch between record and CSV file"""
- def __init__(self, tnum, exp_xid_len, actual_msg_len):
- """Constructor"""
- Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" %
- (tnum, exp_xid_len, actual_msg_len))
-
-class XidSizeError(Exception):
- """Error class for Xid size mismatch"""
- def __init__(self, exp_size, act_size, xid_str):
- """Constructor"""
- Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" %
- (exp_size, act_size, xid_str))
-
-# =============================================================================
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
-
Deleted: store/trunk/cpp/tools/jrnl.py
===================================================================
--- store/trunk/cpp/tools/jrnl.py 2011-05-04 12:00:49 UTC (rev 4457)
+++ store/trunk/cpp/tools/jrnl.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -1,798 +0,0 @@
-"""
-Copyright (c) 2007, 2008 Red Hat, Inc.
-
-This file is part of the Qpid async store library msgstore.so.
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-USA
-
-The GNU Lesser General Public License is available in the file COPYING.
-"""
-
-import jerr
-import os.path, sys, xml.parsers.expat
-from struct import pack, unpack, calcsize
-from time import gmtime, strftime
-
-# TODO: Get rid of these! Use jinf instance instead
-DBLK_SIZE = 128
-SBLK_SIZE = 4 * DBLK_SIZE
-
-# TODO - this is messy - find a better way to handle this
-# This is a global, but is set directly by the calling program
-JRNL_FILE_SIZE = None
-
-#== class Utils ======================================================================
-
-class Utils(object):
- """Class containing utility functions for dealing with the journal"""
-
- __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ "
-
- # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x)
- # When RHEL4 support ends, restore these declarations and remove the older
- # staticmethod() declaration.
-
- #@staticmethod
- def format_data(dsize, data):
- """Format binary data for printing"""
- if data == None:
- return ""
- if Utils._is_printable(data):
- datastr = Utils._split_str(data)
- else:
- datastr = Utils._hex_split_str(data)
- if dsize != len(data):
- raise jerr.DataSizeError(dsize, len(data), datastr)
- return "data(%d)=\"%s\" " % (dsize, datastr)
- format_data = staticmethod(format_data)
-
- #@staticmethod
- def format_xid(xid, xidsize=None):
- """Format binary XID for printing"""
- if xid == None and xidsize != None:
- if xidsize > 0:
- raise jerr.XidSizeError(xidsize, 0, None)
- return ""
- if Utils._is_printable(xid):
- xidstr = Utils._split_str(xid)
- else:
- xidstr = Utils._hex_split_str(xid)
- if xidsize == None:
- xidsize = len(xid)
- elif xidsize != len(xid):
- raise jerr.XidSizeError(xidsize, len(xid), xidstr)
- return "xid(%d)=\"%s\" " % (xidsize, xidstr)
- format_xid = staticmethod(format_xid)
-
- #@staticmethod
- def inv_str(string):
- """Perform a binary 1's compliment (invert all bits) on a binary string"""
- istr = ""
- for index in range(0, len(string)):
- istr += chr(~ord(string[index]) & 0xff)
- return istr
- inv_str = staticmethod(inv_str)
-
- #@staticmethod
- def load(fhandle, klass):
- """Load a record of class klass from a file"""
- args = Utils._load_args(fhandle, klass)
- subclass = klass.discriminate(args)
- result = subclass(*args) # create instance of record
- if subclass != klass:
- result.init(fhandle, *Utils._load_args(fhandle, subclass))
- result.skip(fhandle)
- return result
- load = staticmethod(load)
-
- #@staticmethod
- def load_file_data(fhandle, size, data):
- """Load the data portion of a message from file"""
- if size == 0:
- return (data, True)
- if data == None:
- loaded = 0
- else:
- loaded = len(data)
- foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
- if foverflow:
- rsize = JRNL_FILE_SIZE - fhandle.tell()
- else:
- rsize = size - loaded
- fbin = fhandle.read(rsize)
- if data == None:
- data = unpack("%ds" % (rsize), fbin)[0]
- else:
- data = data + unpack("%ds" % (rsize), fbin)[0]
- return (data, not foverflow)
- load_file_data = staticmethod(load_file_data)
-
- #@staticmethod
- def rem_bytes_in_blk(fhandle, blk_size):
- """Return the remaining bytes in a block"""
- foffs = fhandle.tell()
- return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
- rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
-
- #@staticmethod
- def size_in_blks(size, blk_size):
- """Return the size in terms of data blocks"""
- return int((size + blk_size - 1) / blk_size)
- size_in_blks = staticmethod(size_in_blks)
-
- #@staticmethod
- def size_in_bytes_to_blk(size, blk_size):
- """Return the bytes remaining until the next block boundary"""
- return Utils.size_in_blks(size, blk_size) * blk_size
- size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
-
- #@staticmethod
- def _hex_split_str(in_str, split_size = 50):
- """Split a hex string into two parts separated by an ellipsis"""
- if len(in_str) <= split_size:
- return Utils._hex_str(in_str, 0, len(in_str))
-# if len(in_str) > split_size + 25:
-# return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \
-# Utils._hex_str(in_str, len(in_str)-10, len(in_str))
- return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str))
- _hex_split_str = staticmethod(_hex_split_str)
-
- #@staticmethod
- def _hex_str(in_str, begin, end):
- """Return a binary string as a hex string"""
- hstr = ""
- for index in range(begin, end):
- if Utils._is_printable(in_str[index]):
- hstr += in_str[index]
- else:
- hstr += "\\%02x" % ord(in_str[index])
- return hstr
- _hex_str = staticmethod(_hex_str)
-
- #@staticmethod
- def _is_printable(in_str):
- """Return True if in_str in printable; False otherwise."""
- return in_str.strip(Utils.__printchars) == ""
- _is_printable = staticmethod(_is_printable)
-
- #@staticmethod
- def _load_args(fhandle, klass):
- """Load the arguments from class klass"""
- size = calcsize(klass.FORMAT)
- foffs = fhandle.tell(),
- fbin = fhandle.read(size)
- if len(fbin) != size:
- raise jerr.UnexpectedEndOfFileError(size, len(fbin))
- return foffs + unpack(klass.FORMAT, fbin)
- _load_args = staticmethod(_load_args)
-
- #@staticmethod
- def _split_str(in_str, split_size = 50):
- """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
- if len(in_str) < split_size:
- return in_str
- return in_str[:25] + " ... " + in_str[-25:]
- _split_str = staticmethod(_split_str)
-
-
-#== class Hdr =================================================================
-
-class Hdr:
- """Class representing the journal header records"""
-
- FORMAT = "=4sBBHQ"
- HDR_VER = 1
- OWI_MASK = 0x01
- BIG_ENDIAN = sys.byteorder == "big"
- REC_BOUNDARY = DBLK_SIZE
-
- def __init__(self, foffs, magic, ver, endn, flags, rid):
- """Constructor"""
-# Sizeable.__init__(self)
- self.foffs = foffs
- self.magic = magic
- self.ver = ver
- self.endn = endn
- self.flags = flags
- self.rid = long(rid)
-
- def __str__(self):
- """Return string representation of this header"""
- if self.empty():
- return "0x%08x: <empty>" % (self.foffs)
- if self.magic[-1] == "x":
- return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
- if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]:
- return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn,
- self.flags, self.rid)
- return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" % (self.foffs, self.magic)
-
- #@staticmethod
- def discriminate(args):
- """Use the last char in the header magic to determine the header type"""
- return _CLASSES.get(args[1][-1], Hdr)
- discriminate = staticmethod(discriminate)
-
- def empty(self):
- """Return True if this record is empty (ie has a magic of 0x0000"""
- return self.magic == "\x00"*4
-
- def encode(self):
- """Encode the header into a binary string"""
- return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
-
- def owi(self):
- """Return the OWI (overwrite indicator) for this header"""
- return self.flags & self.OWI_MASK != 0
-
- def skip(self, fhandle):
- """Read and discard the remainder of this record"""
- fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
-
- def check(self):
- """Check that this record is valid"""
- if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]:
- return True
- if self.magic[-1] != "x":
- if self.ver != self.HDR_VER:
- raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
- if bool(self.endn) != self.BIG_ENDIAN:
- raise jerr.EndianMismatchError(self.BIG_ENDIAN)
- return False
-
-
-#== class FileHdr =============================================================
-
-class FileHdr(Hdr):
- """Class for file headers, found at the beginning of journal files"""
-
- FORMAT = "=2H4x3Q"
- REC_BOUNDARY = SBLK_SIZE
-
- def __str__(self):
- """Return a string representation of the this FileHdr instance"""
- return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro,
- self.timestamp_str())
-
- def encode(self):
- """Encode this class into a binary string"""
- return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
-
- def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
- """Initialize this instance to known values"""
- self.fid = fid
- self.lid = lid
- self.fro = fro
- self.time_sec = time_sec
- self.time_ns = time_ns
-
- def timestamp(self):
- """Get the timestamp of this record as a tuple (secs, nsecs)"""
- return (self.time_sec, self.time_ns)
-
- def timestamp_str(self):
- """Get the timestamp of this record in string format"""
- time = gmtime(self.time_sec)
- fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
- return strftime(fstr, time)
-
-
-#== class DeqRec ==============================================================
-
-class DeqRec(Hdr):
- """Class for a dequeue record"""
-
- FORMAT = "=QQ"
-
- def __str__(self):
- """Return a string representation of the this DeqRec instance"""
- return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
-
- def init(self, fhandle, foffs, deq_rid, xidsize):
- """Initialize this instance to known values"""
- self.deq_rid = deq_rid
- self.xidsize = xidsize
- self.xid = None
- self.deq_tail = None
- self.xid_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def encode(self):
- """Encode this class into a binary string"""
- buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
- if self.xidsize > 0:
- fmt = "%ds" % (self.xidsize)
- buf += pack(fmt, self.xid)
- buf += self.deq_tail.encode()
- return buf
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been loaded"""
- if self.xidsize == 0:
- self.xid_complete = True
- self.tail_complete = True
- else:
- if not self.xid_complete:
- (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid)
- if self.xid_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
- magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.deq_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False otherwise"""
- return self.xid_complete and self.tail_complete
-
-
-#== class TxnRec ==============================================================
-
-class TxnRec(Hdr):
- """Class for a transaction commit/abort record"""
-
- FORMAT = "=Q"
-
- def __str__(self):
- """Return a string representation of the this TxnRec instance"""
- return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize))
-
- def init(self, fhandle, foffs, xidsize):
- """Initialize this instance to known values"""
- self.xidsize = xidsize
- self.xid = None
- self.tx_tail = None
- self.xid_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def encode(self):
- """Encode this class into a binary string"""
- return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \
- self.tx_tail.encode()
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been loaded"""
- if not self.xid_complete:
- ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
- self.xid = ret[0]
- self.xid_complete = ret[1]
- if self.xid_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
- magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.tx_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False otherwise"""
- return self.xid_complete and self.tail_complete
-
-
-#== class EnqRec ==============================================================
-
-class EnqRec(Hdr):
- """Class for a enqueue record"""
-
- FORMAT = "=QQ"
- TRANSIENT_MASK = 0x10
- EXTERN_MASK = 0x20
-
- def __str__(self):
- """Return a string representation of the this EnqRec instance"""
- return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize),
- Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags())
-
- def encode(self):
- """Encode this class into a binary string"""
- buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
- if self.xidsize > 0:
- buf += pack("%ds" % self.xidsize, self.xid)
- if self.dsize > 0:
- buf += pack("%ds" % self.dsize, self.data)
- if self.xidsize > 0 or self.dsize > 0:
- buf += self.enq_tail.encode()
- return buf
-
- def init(self, fhandle, foffs, xidsize, dsize):
- """Initialize this instance to known values"""
- self.xidsize = xidsize
- self.dsize = dsize
- self.transient = self.flags & self.TRANSIENT_MASK > 0
- self.extern = self.flags & self.EXTERN_MASK > 0
- self.xid = None
- self.data = None
- self.enq_tail = None
- self.xid_complete = False
- self.data_complete = False
- self.tail_complete = False
- self.tail_bin = None
- self.tail_offs = 0
- self.load(fhandle)
-
- def load(self, fhandle):
- """Load the remainder of this record (after the header has been loaded"""
- if not self.xid_complete:
- ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
- self.xid = ret[0]
- self.xid_complete = ret[1]
- if self.xid_complete and not self.data_complete:
- if self.extern:
- self.data_complete = True
- else:
- ret = Utils.load_file_data(fhandle, self.dsize, self.data)
- self.data = ret[0]
- self.data_complete = ret[1]
- if self.data_complete and not self.tail_complete:
- ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
- self.tail_bin = ret[0]
- if ret[1]:
- self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
- magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
- rid_err = self.enq_tail.rid != self.rid
- if magic_err or rid_err:
- raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
- self.skip(fhandle)
- self.tail_complete = ret[1]
- return self.complete()
-
- def complete(self):
- """Returns True if the entire record is loaded, False otherwise"""
- return self.xid_complete and self.data_complete and self.tail_complete
-
- def print_flags(self):
- """Utility function to decode the flags field in the header and print a string representation"""
- fstr = ""
- if self.transient:
- fstr = "*TRANSIENT"
- if self.extern:
- if len(fstr) > 0:
- fstr += ",EXTERNAL"
- else:
- fstr = "*EXTERNAL"
- if len(fstr) > 0:
- fstr += "*"
- return fstr
-
-
-#== class RecTail =============================================================
-
-class RecTail:
- """Class for a record tail - for all records where either an XID or data separate the header from the end of the
- record"""
-
- FORMAT = "=4sQ"
-
- def __init__(self, foffs, magic_inv, rid):
- """Initialize this instance to known values"""
- self.foffs = foffs
- self.magic_inv = magic_inv
- self.rid = long(rid)
-
- def __str__(self):
- """Return a string representation of the this RecTail instance"""
- magic = Utils.inv_str(self.magic_inv)
- return "[\"%s\" rid=0x%x]" % (magic, self.rid)
-
- def encode(self):
- """Encode this class into a binary string"""
- return pack(RecTail.FORMAT, self.magic_inv, self.rid)
-
-
-#== class JrnlInfo ============================================================
-
-class JrnlInfo(object):
- """
- This object reads and writes journal information files (<basename>.jinf). Methods are provided
- to read a file, query its properties and reset just those properties necessary for normalizing
- and resizing a journal.
-
- Normalizing: resetting the directory and/or base filename to different values. This is necessary
- if a set of journal files is copied from one location to another before being restored, as the
- value of the path in the file no longer matches the actual path.
-
- Resizing: If the journal geometry parameters (size and number of journal files) changes, then the
- .jinf file must reflect these changes, as this file is the source of information for journal
- recovery.
-
- NOTE: Data size vs File size: There are methods which return the data size and file size of the
- journal files.
-
- +-------------+--------------------/ /----------+
- | File header | File data |
- +-------------+--------------------/ /----------+
- | | |
- | |<---------- Data size ---------->|
- |<------------------ File Size ---------------->|
-
- Data size: The size of the data content of the journal, ie that part which stores the data records.
-
- File size: The actual disk size of the journal including data and the file header which precedes the
- data.
-
- The file header is fixed to 1 sblk, so file size = jrnl size + sblk size.
- """
-
- def __init__(self, jdir, bfn = "JournalData"):
- """Constructor"""
- self.__jdir = jdir
- self.__bfn = bfn
- self.__jinf_dict = {}
- self._read_jinf()
-
- def __str__(self):
- """Create a string containing all of the journal info contained in the jinf file"""
- ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
- for key, val in self.__jinf_dict.iteritems():
- ostr += " %s = %s\n" % (key, val)
- return ostr
-
- def normalize(self, jdir = None, bfn = None):
- """Normalize the directory (ie reset the directory path to match the actual current location) for this
- jinf file"""
- if jdir == None:
- self.__jinf_dict["directory"] = self.__jdir
- else:
- self.__jdir = jdir
- self.__jinf_dict["directory"] = jdir
- if bfn != None:
- self.__bfn = bfn
- self.__jinf_dict["base_filename"] = bfn
-
- def resize(self, num_jrnl_files = None, jrnl_file_size = None):
- """Reset the journal size information to allow for resizing the journal"""
- if num_jrnl_files != None:
- self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
- if jrnl_file_size != None:
- self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes()
-
- def write(self, jdir = None, bfn = None):
- """Write the .jinf file"""
- self.normalize(jdir, bfn)
- if not os.path.exists(self.get_jrnl_dir()):
- os.makedirs(self.get_jrnl_dir())
- fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w")
- fhandle.write("<?xml version=\"1.0\" ?>\n")
- fhandle.write("<jrnl>\n")
- fhandle.write(" <journal_version value=\"%d\" />\n" % self.get_jrnl_version())
- fhandle.write(" <journal_id>\n")
- fhandle.write(" <id_string value=\"%s\" />\n" % self.get_jrnl_id())
- fhandle.write(" <directory value=\"%s\" />\n" % self.get_jrnl_dir())
- fhandle.write(" <base_filename value=\"%s\" />\n" % self.get_jrnl_base_name())
- fhandle.write(" </journal_id>\n")
- fhandle.write(" <creation_time>\n")
- fhandle.write(" <seconds value=\"%d\" />\n" % self.get_creation_time()[0])
- fhandle.write(" <nanoseconds value=\"%d\" />\n" % self.get_creation_time()[1])
- fhandle.write(" <string value=\"%s\" />\n" % self.get_creation_time_str())
- fhandle.write(" </creation_time>\n")
- fhandle.write(" <journal_file_geometry>\n")
- fhandle.write(" <number_jrnl_files value=\"%d\" />\n" % self.get_num_jrnl_files())
- fhandle.write(" <auto_expand value=\"%s\" />\n" % str.lower(str(self.get_auto_expand())))
- fhandle.write(" <jrnl_file_size_sblks value=\"%d\" />\n" % self.get_jrnl_data_size_sblks())
- fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_sblk_size_dblks())
- fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_dblk_size_bytes())
- fhandle.write(" </journal_file_geometry>\n")
- fhandle.write(" <cache_geometry>\n")
- fhandle.write(" <wcache_pgsize_sblks value=\"%d\" />\n" % self.get_wr_buf_pg_size_sblks())
- fhandle.write(" <wcache_num_pages value=\"%d\" />\n" % self.get_num_wr_buf_pgs())
- fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.get_rd_buf_pg_size_sblks())
- fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.get_num_rd_buf_pgs())
- fhandle.write(" </cache_geometry>\n")
- fhandle.write("</jrnl>\n")
- fhandle.close()
-
- # Journal ID
-
- def get_jrnl_version(self):
- """Get the journal version"""
- return self.__jinf_dict["journal_version"]
-
- def get_jrnl_id(self):
- """Get the journal id"""
- return self.__jinf_dict["id_string"]
-
- def get_current_dir(self):
- """Get the current directory of the store (as opposed to that value saved in the .jinf file)"""
- return self.__jdir
-
- def get_jrnl_dir(self):
- """Get the journal directory stored in the .jinf file"""
- return self.__jinf_dict["directory"]
-
- def get_jrnl_base_name(self):
- """Get the base filename - that string used to name the journal files <basefilename>-nnnn.jdat and
- <basefilename>.jinf"""
- return self.__jinf_dict["base_filename"]
-
- # Journal creation time
-
- def get_creation_time(self):
- """Get journal creation time as a tuple (secs, nsecs)"""
- return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"])
-
- def get_creation_time_str(self):
- """Get journal creation time as a string"""
- return self.__jinf_dict["string"]
-
- # --- Files and geometry ---
-
- def get_num_jrnl_files(self):
- """Get number of data files in the journal"""
- return self.__jinf_dict["number_jrnl_files"]
-
- def get_auto_expand(self):
- """Return True if auto-expand is enabled; False otherwise"""
- return self.__jinf_dict["auto_expand"]
-
- def get_jrnl_sblk_size_dblks(self):
- """Get the journal softblock size in dblks"""
- return self.__jinf_dict["JRNL_SBLK_SIZE"]
-
- def get_jrnl_sblk_size_bytes(self):
- """Get the journal softblock size in bytes"""
- return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_jrnl_dblk_size_bytes(self):
- """Get the journal datablock size in bytes"""
- return self.__jinf_dict["JRNL_DBLK_SIZE"]
-
- def get_jrnl_data_size_sblks(self):
- """Get the data capacity (excluding the file headers) for one journal file in softblocks"""
- return self.__jinf_dict["jrnl_file_size_sblks"]
-
- def get_jrnl_data_size_dblks(self):
- """Get the data capacity (excluding the file headers) for one journal file in datablocks"""
- return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_jrnl_data_size_bytes(self):
- """Get the data capacity (excluding the file headers) for one journal file in bytes"""
- return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_jrnl_file_size_sblks(self):
- """Get the size of one journal file on disk (including the file headers) in softblocks"""
- return self.get_jrnl_data_size_sblks() + 1
-
- def get_jrnl_file_size_dblks(self):
- """Get the size of one journal file on disk (including the file headers) in datablocks"""
- return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_jrnl_file_size_bytes(self):
- """Get the size of one journal file on disk (including the file headers) in bytes"""
- return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_tot_jrnl_data_size_sblks(self):
- """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
- softblocks"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
-
- def get_tot_jrnl_data_size_dblks(self):
- """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
- datablocks"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
-
- def get_tot_jrnl_data_size_bytes(self):
- """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
- bytes"""
- return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
-
- # Read and write buffers
-
- def get_wr_buf_pg_size_sblks(self):
- """Get the size of the write buffer pages in softblocks"""
- return self.__jinf_dict["wcache_pgsize_sblks"]
-
- def get_wr_buf_pg_size_dblks(self):
- """Get the size of the write buffer pages in datablocks"""
- return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
-
- def get_wr_buf_pg_size_bytes(self):
- """Get the size of the write buffer pages in bytes"""
- return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
-
- def get_num_wr_buf_pgs(self):
- """Get the number of write buffer pages"""
- return self.__jinf_dict["wcache_num_pages"]
-
- def get_rd_buf_pg_size_sblks(self):
- """Get the size of the read buffer pages in softblocks"""
- return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
-
- def get_rd_buf_pg_size_dblks(self):
- """Get the size of the read buffer pages in datablocks"""
- return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
-
- def get_rd_buf_pg_size_bytes(self):
- """Get the size of the read buffer pages in bytes"""
- return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
-
- def get_num_rd_buf_pgs(self):
- """Get the number of read buffer pages"""
- return self.__jinf_dict["JRNL_RMGR_PAGES"]
-
- def _read_jinf(self):
- """Read and initialize this instance from an existing jinf file located at the directory named in the
- constructor - called by the constructor"""
- fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
- parser = xml.parsers.expat.ParserCreate()
- parser.StartElementHandler = self._handle_xml_start_elt
- parser.CharacterDataHandler = self._handle_xml_char_data
- parser.EndElementHandler = self._handle_xml_end_elt
- parser.ParseFile(fhandle)
- fhandle.close()
-
- def _handle_xml_start_elt(self, name, attrs):
- """Callback for handling XML start elements. Used by the XML parser."""
- # bool values
- if name == "auto_expand":
- self.__jinf_dict[name] = attrs["value"] == "true"
- # long values
- elif name == "seconds" or \
- name == "nanoseconds":
- self.__jinf_dict[name] = long(attrs["value"])
- # int values
- elif name == "journal_version" or \
- name == "number_jrnl_files" or \
- name == "jrnl_file_size_sblks" or \
- name == "JRNL_SBLK_SIZE" or \
- name == "JRNL_DBLK_SIZE" or \
- name == "wcache_pgsize_sblks" or \
- name == "wcache_num_pages" or \
- name == "JRNL_RMGR_PAGE_SIZE" or \
- name == "JRNL_RMGR_PAGES":
- self.__jinf_dict[name] = int(attrs["value"])
- # strings
- elif "value" in attrs:
- self.__jinf_dict[name] = attrs["value"]
-
- def _handle_xml_char_data(self, data):
- """Callback for handling character data (ie within <elt>...</elt>). The jinf file does not use this in its
- data. Used by the XML parser."""
- pass
-
- def _handle_xml_end_elt(self, name):
- """Callback for handling XML end elements. Used by XML parser."""
- pass
-
-
-#==============================================================================
-
-_CLASSES = {
- "a": TxnRec,
- "c": TxnRec,
- "d": DeqRec,
- "e": EnqRec,
- "f": FileHdr
-}
-
-if __name__ == "__main__":
- print "This is a library, and cannot be executed."
Copied: store/trunk/cpp/tools/qpidstore/__init__.py (from rev 4457, store/trunk/cpp/tools/__init__.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/__init__.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/__init__.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,23 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
Copied: store/trunk/cpp/tools/qpidstore/janal.py (from rev 4457, store/trunk/cpp/tools/janal.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/janal.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/janal.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,612 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr, jrnl
+import os.path, sys
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+ """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock"""
+
+ def __init__(self):
+ """Constructor"""
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr, lock = False):
+ """Add a new record into the map"""
+ if hdr.rid in self.__map:
+ raise jerr.DuplicateRidError(hdr.rid)
+ self.__map[hdr.rid] = [fid, hdr, lock]
+
+ def contains(self, rid):
+ """Return True if the map contains the given rid"""
+ return rid in self.__map
+
+ def delete(self, rid):
+ """Delete the rid and its associated data from the map"""
+ if rid in self.__map:
+ if self.get_lock(rid):
+ raise jerr.DeleteLockedRecordError(rid)
+ del self.__map[rid]
+ else:
+ raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
+
+ def get(self, rid):
+ """Return a list [fid, hdr, lock] for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid]
+ return None
+
+ def get_fid(self, rid):
+ """Return the fid for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][0]
+ return None
+
+ def get_hdr(self, rid):
+ """Return the header record for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][1]
+ return None
+
+ def get_lock(self, rid):
+ """Return the transaction lock value for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][2]
+ return None
+
+ def get_rec_list(self):
+ """Return a list of tuples (fid, hdr, lock) for all entries in the map"""
+ return self.__map.values()
+
+ def lock(self, rid):
+ """Set the transaction lock for a given rid to True"""
+ if rid in self.__map:
+ if not self.__map[rid][2]: # locked
+ self.__map[rid][2] = True
+ else:
+ raise jerr.AlreadyLockedError(rid)
+ else:
+ raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No enqueued records found."
+ rstr = "%d enqueued records found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.__map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ if self.__map[rid][2]:
+ lock_str = " [LOCKED]"
+ else:
+ lock_str = ""
+ rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
+ else:
+ rstr += "."
+ return rstr
+
+ def rids(self):
+ """Return a list of rids in the map"""
+ return self.__map.keys()
+
+ def size(self):
+ """Return the number of entries in the map"""
+ return len(self.__map)
+
+ def unlock(self, rid):
+ """Set the transaction lock for a given rid to False"""
+ if rid in self.__map:
+ if self.__map[rid][2]:
+ self.__map[rid][2] = False
+ else:
+ raise jerr.NotLockedError(rid)
+ else:
+ raise jerr.NonExistentRecordError("unlock", rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+ """Transaction map, which maps xids to a list of outstanding actions"""
+
+ def __init__(self, emap):
+ """Constructor, requires an existing EnqMap instance"""
+ self.__emap = emap
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ """Add a new transactional record into the map"""
+ if isinstance(hdr, jrnl.DeqRec):
+ try:
+ self.__emap.lock(hdr.deq_rid)
+ except jerr.JWarning:
+ # Not in emap, look for rid in tmap
+ l = self.find_rid(hdr.deq_rid, hdr.xid)
+ if l != None:
+ if l[2]:
+ raise jerr.AlreadyLockedError(hdr.deq_rid)
+ l[2] = True
+ if hdr.xid in self.__map:
+ self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
+ else:
+ self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
+
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False otherwise"""
+ return xid in self.__map
+
+ def delete(self, hdr):
+ """Remove a transaction record from the map using either a commit or abort header"""
+ if hdr.magic[-1] == "c":
+ return self._commit(hdr.xid)
+ if hdr.magic[-1] == "a":
+ self._abort(hdr.xid)
+ else:
+ raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid)
+
+ def find_rid(self, rid, xid_hint = None):
+ """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first"""
+ if xid_hint != None and self.contains(xid_hint):
+ for l in self.__map[xid_hint]:
+ if l[1].rid == rid:
+ return l
+ for xid in self.__map.iterkeys():
+ if xid_hint == None or xid != xid_hint:
+ for l in self.__map[xid]:
+ if l[1].rid == rid:
+ return l
+
+ def get(self, xid):
+ """Return a list of operations for the given xid"""
+ if self.contains(xid):
+ return self.__map[xid]
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No outstanding transactions found."
+ rstr = "%d outstanding transactions found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ for xid, tup in self.__map.iteritems():
+ rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
+ for i in tup:
+ rstr += "\n %s" % str(i[1])
+ else:
+ rstr += "."
+ return rstr
+
+ def size(self):
+ """Return the number of xids in the map"""
+ return len(self.__map)
+
+ def xids(self):
+ """Return a list of xids in the map"""
+ return self.__map.keys()
+
+ def _abort(self, xid):
+ """Perform an abort operation for the given xid record"""
+ for fid, hdr in self.__map[xid]:
+ if isinstance(hdr, jrnl.DeqRec):
+ self.__emap.unlock(hdr.rid)
+ del self.__map[xid]
+
+ def _commit(self, xid):
+ """Perform a commit operation for the given xid record"""
+ mismatch_list = []
+ for fid, hdr, lock in self.__map[xid]:
+ if isinstance(hdr, jrnl.EnqRec):
+ self.__emap.add(fid, hdr, lock) # Transfer enq to emap
+ else:
+ if self.__emap.contains(hdr.deq_rid):
+ self.__emap.unlock(hdr.deq_rid)
+ self.__emap.delete(hdr.deq_rid)
+ else:
+ mismatch_list.append("0x%x" % hdr.deq_rid)
+ del self.__map[xid]
+ return mismatch_list
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+ """
+ This class analyzes a set of journal files and determines which is the last to be written
+ (the newest file), and hence which should be the first to be read for recovery (the oldest
+ file).
+
+ The analysis is performed on construction; the contents of the JrnlInfo object passed provide
+ the recovery details.
+ """
+
+ def __init__(self, jinf):
+ """Constructor"""
+ self.__oldest = None
+ self.__jinf = jinf
+ self.__flist = self._analyze()
+
+ def __str__(self):
+ """String representation of this JrnlAnalyzer instance, will print out results of analysis."""
+ ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir()
+ if self.is_empty():
+ ostr += " <All journal files are empty>\n"
+ else:
+ for tup in self.__flist:
+ tmp = " "
+ if tup[0] == self.__oldest[0]:
+ tmp = "*"
+ ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2],
+ tup[3], tup[4], tup[5])
+ for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
+ ostr += " %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i)
+ return ostr
+
+ # Analysis
+
+ def get_oldest_file(self):
+ """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal"""
+ return self.__oldest
+
+ def get_oldest_file_index(self):
+ """Return the ordinal number of the oldest data file found in the journal"""
+ if self.is_empty():
+ return None
+ return self.__oldest[0]
+
+ def is_empty(self):
+ """Return true if the analysis found that the journal file has never been written to"""
+ return len(self.__flist) == 0
+
+ def _analyze(self):
+ """Perform the journal file analysis by reading and comparing the file headers of each journal data file"""
+ owi_found = False
+ flist = []
+ for i in range(0, self.__jinf.get_num_jrnl_files()):
+ jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i))
+ fhandle = open(jfn)
+ fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
+ if fhdr.empty():
+ break
+ this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ flist.append(this_tup)
+ if i == 0:
+ init_owi = fhdr.owi()
+ self.__oldest = this_tup
+ elif fhdr.owi() != init_owi and not owi_found:
+ self.__oldest = this_tup
+ owi_found = True
+ return flist
+
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+ """
+ This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
+ object list (txn_obj_list) which are populated by reading the journals from the oldest
+ to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+ objects supplied on construction provide the information used for the recovery.
+
+ The analysis is performed on construction.
+ """
+
+ def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
+ """Constructor, which reads all """
+ self._jinfo = jinfo
+ self._jra = jra
+ self._qflag = qflag
+ self._rflag = rflag
+ self._vflag = vflag
+
+ # test callback functions for CSV tests
+ self._csv_store_chk = None
+ self._csv_start_cb = None
+ self._csv_enq_cb = None
+ self._csv_deq_cb = None
+ self._csv_txn_cb = None
+ self._csv_end_cb = None
+
+ self._emap = EnqMap()
+ self._tmap = TxnMap(self._emap)
+ self._txn_obj_list = {}
+
+ self._file = None
+ self._file_hdr = None
+ self._file_num = None
+ self._first_rec_flag = None
+ self._fro = None
+ self._last_file_flag = None
+ self._start_file_num = None
+ self._file_hdr_owi = None
+ self._warning = []
+
+ self._abort_cnt = 0
+ self._commit_cnt = 0
+ self._msg_cnt = 0
+ self._rec_cnt = 0
+ self._txn_msg_cnt = 0
+
+ def __str__(self):
+ """Print out all the undequeued records"""
+ return self.report(True, self._rflag)
+
+ def emap(self):
+ """Get the enqueue map"""
+ return self._emap
+
+ def get_abort_cnt(self):
+ """Get the cumulative number of transactional aborts found"""
+ return self._abort_cnt
+
+ def get_commit_cnt(self):
+ """Get the cumulative number of transactional commits found"""
+ return self._commit_cnt
+
+ def get_msg_cnt(self):
+ """Get the cumulative number of messages found"""
+ return self._msg_cnt
+
+ def get_rec_cnt(self):
+ """Get the cumulative number of journal records (including fillers) found"""
+ return self._rec_cnt
+
+ def is_last_file(self):
+ """Return True if the last file is being read"""
+ return self._last_file_flag
+
+ def report(self, show_stats = True, show_records = False):
+ """Return a string containing a report on the file analysis"""
+ rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records)
+ #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold
+ return rstr
+
+ def run(self):
+ """Perform the read of the journal"""
+ if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
+ return
+ if self._jra.is_empty():
+ return
+ stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
+ while not stop and not self._get_next_record():
+ pass
+ if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
+ return
+ if not self._qflag:
+ print
+
+ def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None,
+ csv_end_cb = None):
+ """Set callbacks for checks to be made at various points while reading the journal"""
+ self._csv_store_chk = csv_store_chk
+ self._csv_start_cb = csv_start_cb
+ self._csv_enq_cb = csv_enq_cb
+ self._csv_deq_cb = csv_deq_cb
+ self._csv_txn_cb = csv_txn_cb
+ self._csv_end_cb = csv_end_cb
+
+ def tmap(self):
+ """Return the transaction map"""
+ return self._tmap
+
+ def get_txn_msg_cnt(self):
+ """Get the cumulative transactional message count"""
+ return self._txn_msg_cnt
+
+ def txn_obj_list(self):
+ """Get a cumulative list of transaction objects (commits and aborts)"""
+ return self._txn_obj_list
+
+ def _advance_jrnl_file(self, *oldest_file_info):
+ """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
+ more files to read."""
+ fro_seek_flag = False
+ if len(oldest_file_info) > 0:
+ self._start_file_num = self._file_num = oldest_file_info[0]
+ self._fro = oldest_file_info[4]
+ fro_seek_flag = True # jump to fro to start reading
+ if not self._qflag and not self._rflag:
+ if self._vflag:
+ print "Recovering journals..."
+ else:
+ print "Recovering journals",
+ if self._file != None and self._is_file_full():
+ self._file.close()
+ self._file_num = self._incr_file_num()
+ if self._file_num == self._start_file_num:
+ return True
+ if self._start_file_num == 0:
+ self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1
+ else:
+ self._last_file_flag = self._file_num == self._start_file_num - 1
+ if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files():
+ raise jerr.BadFileNumberError(self._file_num)
+ jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
+ (self._jinfo.get_jrnl_base_name(), self._file_num))
+ self._file = open(jfn)
+ self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ if fro_seek_flag and self._file.tell() != self._fro:
+ self._file.seek(self._fro)
+ self._first_rec_flag = True
+ if not self._qflag:
+ if self._rflag:
+ print jfn, ": ", self._file_hdr
+ elif self._vflag:
+ print "* Reading %s" % jfn
+ else:
+ print ".",
+ sys.stdout.flush()
+ return False
+
+ def _check_owi(self, hdr):
+ """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
+ indicate whether the last record in a file has been read and now older records which have not yet been
+ overwritten are now being read."""
+ return self._file_hdr_owi == hdr.owi()
+
+ def _is_file_full(self):
+ """Return True if the current file is full (no more write space); false otherwise"""
+ return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
+
+ def _get_next_record(self):
+ """Get the next record in the file for analysis"""
+ if self._is_file_full():
+ if self._advance_jrnl_file():
+ return True
+ try:
+ hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ except:
+ return True
+ if hdr.empty():
+ return True
+ if hdr.check():
+ return True
+ self._rec_cnt += 1
+ self._file_hdr_owi = self._file_hdr.owi()
+ if self._first_rec_flag:
+ if self._file_hdr.fro != hdr.foffs:
+ raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
+ else:
+ if self._rflag:
+ print " * fro ok: 0x%x" % self._file_hdr.fro
+ self._first_rec_flag = False
+ stop = False
+ if isinstance(hdr, jrnl.EnqRec):
+ stop = self._handle_enq_rec(hdr)
+ elif isinstance(hdr, jrnl.DeqRec):
+ stop = self._handle_deq_rec(hdr)
+ elif isinstance(hdr, jrnl.TxnRec):
+ stop = self._handle_txn_rec(hdr)
+ wstr = ""
+ for warn in self._warning:
+ wstr += " (%s)" % warn
+ if self._rflag:
+ print " > %s %s" % (hdr, wstr)
+ self._warning = []
+ return stop
+
+ def _handle_deq_rec(self, hdr):
+ """Process a dequeue ("RHMd") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
+ return True
+
+ try:
+ if hdr.xid == None:
+ self._emap.delete(hdr.deq_rid)
+ else:
+ self._tmap.add(self._file_hdr.fid, hdr)
+ except jerr.JWarning, warn:
+ self._warning.append(str(warn))
+ return False
+
+ def _handle_enq_rec(self, hdr):
+ """Process a dequeue ("RHMe") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check extern flag
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagDataError(hdr)
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.xid == None:
+ self._emap.add(self._file_hdr.fid, hdr)
+ else:
+ self._txn_msg_cnt += 1
+ self._tmap.add(self._file_hdr.fid, hdr)
+ self._msg_cnt += 1
+ return False
+
+ def _handle_txn_rec(self, hdr):
+ """Process a transaction ("RHMa or RHMc") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.magic[-1] == "a":
+ self._abort_cnt += 1
+ else:
+ self._commit_cnt += 1
+
+ if self._tmap.contains(hdr.xid):
+ mismatched_rids = self._tmap.delete(hdr)
+ if mismatched_rids != None and len(mismatched_rids) > 0:
+ self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
+ mismatched_rids)
+ else:
+ self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
+ if hdr.magic[-1] == "c": # commits only
+ self._txn_obj_list[hdr.xid] = hdr
+ return False
+
+ def _incr_file_num(self):
+ """Increment the number of files read with wraparound (ie after file n-1, go to 0)"""
+ self._file_num += 1
+ if self._file_num >= self._jinfo.get_num_jrnl_files():
+ self._file_num = 0
+ return self._file_num
+
+ def _load_rec(self, hdr):
+ """Load a single record for the given header. There may be arbitrarily large xids and data components."""
+ while not hdr.complete():
+ if self._advance_jrnl_file():
+ return True
+ hdr.load(self._file)
+ return False
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
Copied: store/trunk/cpp/tools/qpidstore/jerr.py (from rev 4457, store/trunk/cpp/tools/jerr.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/jerr.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/jerr.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,223 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+# == Warnings =================================================================
+
+class JWarning(Exception):
+ """Class to convey a warning"""
+ def __init__(self, err):
+ """Constructor"""
+ Exception.__init__(self, err)
+
+# == Errors ===================================================================
+
+class AllJrnlFilesEmptyCsvError(Exception):
+ """All journal files are empty (never been written)"""
+ def __init__(self, tnum, exp_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] All journal files are empty, but test expects %d msg(s)." %
+ (tnum, exp_num_msgs))
+
+class AlreadyLockedError(Exception):
+ """Error class for trying to lock a record that is already locked"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Locking record which is already locked in EnqMap: rid=0x%x" % rid)
+
+class BadFileNumberError(Exception):
+ """Error class for incorrect or unexpected file number"""
+ def __init__(self, file_num):
+ """Constructor"""
+ Exception.__init__(self, "Bad file number %d" % file_num)
+
+class DataSizeError(Exception):
+ """Error class for data size mismatch"""
+ def __init__(self, exp_size, act_size, data_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent data size: expected:%d; actual:%d; data=\"%s\"" %
+ (exp_size, act_size, data_str))
+
+class DeleteLockedRecordError(Exception):
+ """Error class for deleting a locked record from the enqueue map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid)
+
+class DequeueNonExistentEnqueueError(Exception):
+ """Error class for attempting to dequeue a non-existent enqueue record (rid)"""
+ def __init__(self, deq_rid):
+ """Constructor"""
+ Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid)
+
+class DuplicateRidError(Exception):
+ """Error class for placing duplicate rid into enqueue map"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Adding duplicate record to EnqMap: rid=0x%x" % rid)
+
+class EndianMismatchError(Exception):
+ """Error class mismatched record header endian flag"""
+ def __init__(self, exp_endianness):
+ """Constructor"""
+ Exception.__init__(self, "Endian mismatch: expected %s, but current record is %s" %
+ self.endian_str(exp_endianness))
+ #@staticmethod
+ def endian_str(endianness):
+ """Return a string tuple for the endianness error message"""
+ if endianness:
+ return "big", "little"
+ return "little", "big"
+ endian_str = staticmethod(endian_str)
+
+class ExternFlagDataError(Exception):
+ """Error class for the extern flag being set and the internal size > 0"""
+ def __init__(self, hdr):
+ """Constructor"""
+ Exception.__init__(self, "Message data found (msg size > 0) on record with external flag set: hdr=%s" % hdr)
+
+class ExternFlagCsvError(Exception):
+ """External flag mismatch between record and CSV test file"""
+ def __init__(self, tnum, exp_extern_flag):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] External flag mismatch: expected %s" % (tnum, exp_extern_flag))
+
+class ExternFlagWithDataCsvError(Exception):
+ """External flag set and Message data found"""
+ def __init__(self, tnum):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message data found on record with external flag set" % tnum)
+
+class FillExceedsFileSizeError(Exception):
+ """Internal error from a fill operation which will exceed the specified file size"""
+ def __init__(self, cur_size, file_size):
+ """Constructor"""
+ Exception.__init__(self, "Filling to size %d > max file size %d" % (cur_size, file_size))
+
+class FillSizeError(Exception):
+ """Internal error from a fill operation that did not match the calculated end point in the file"""
+ def __init__(self, cur_posn, exp_posn):
+ """Constructor"""
+ Exception.__init__(self, "Filled to size %d > expected file posn %d" % (cur_posn, exp_posn))
+
+class FirstRecordOffsetMismatch(Exception):
+ """Error class for file header fro mismatch with actual record"""
+ def __init__(self, fro, actual_offs):
+ """Constructor"""
+ Exception.__init__(self, "File header first record offset mismatch: fro=0x%x; actual offs=0x%x" %
+ (fro, actual_offs))
+
+class InvalidHeaderVersionError(Exception):
+ """Error class for invalid record header version"""
+ def __init__(self, exp_ver, act_ver):
+ """Constructor"""
+ Exception.__init__(self, "Invalid header version: expected:%d, actual:%d." % (exp_ver, act_ver))
+
+class InvalidRecordTypeError(Exception):
+ """Error class for any operation using an invalid record type"""
+ def __init__(self, operation, magic, rid):
+ """Constructor"""
+ Exception.__init__(self, "Invalid record type for operation: operation=%s record magic=%s, rid=0x%x" %
+ (operation, magic, rid))
+
+class InvalidRecordTailError(Exception):
+ """Error class for invalid record tail"""
+ def __init__(self, magic_err, rid_err, rec):
+ """Constructor"""
+ Exception.__init__(self, " > %s *INVALID TAIL RECORD (%s)*" % (rec, self.tail_err_str(magic_err, rid_err)))
+ #@staticmethod
+ def tail_err_str(magic_err, rid_err):
+ """Return a string indicating the tail record error(s)"""
+ estr = ""
+ if magic_err:
+ estr = "magic bad"
+ if rid_err:
+ estr += ", "
+ if rid_err:
+ estr += "rid mismatch"
+ return estr
+ tail_err_str = staticmethod(tail_err_str)
+
+class NonExistentRecordError(Exception):
+ """Error class for any operation on an non-existent record"""
+ def __init__(self, operation, rid):
+ """Constructor"""
+ Exception.__init__(self, "Operation on non-existent record: operation=%s; rid=0x%x" % (operation, rid))
+
+class NotLockedError(Exception):
+ """Error class for unlocking a record which is not locked in the first place"""
+ def __init__(self, rid):
+ """Constructor"""
+ Exception.__init__(self, "Unlocking record which is not locked in EnqMap: rid=0x%x" % rid)
+
+class JournalSpaceExceededError(Exception):
+ """Error class for when journal space of resized journal is too small to contain the transferred records"""
+ def __init__(self):
+ """Constructor"""
+ Exception.__init__(self, "Ran out of journal space while writing records")
+
+class MessageLengthCsvError(Exception):
+ """Message length mismatch between record and CSV test file"""
+ def __init__(self, tnum, exp_msg_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message length mismatch: expected %d; found %d" %
+ (tnum, exp_msg_len, actual_msg_len))
+
+class NumMsgsCsvError(Exception):
+ """Number of messages found mismatched with CSV file"""
+ def __init__(self, tnum, exp_num_msgs, actual_num_msgs):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %s] Incorrect number of messages: expected %d, found %d" %
+ (tnum, exp_num_msgs, actual_num_msgs))
+
+class TransactionCsvError(Exception):
+ """Transaction mismatch between record and CSV file"""
+ def __init__(self, tnum, exp_transactional):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Transaction mismatch: expected %s" % (tnum, exp_transactional))
+
+class UnexpectedEndOfFileError(Exception):
+ """Error class for unexpected end-of-file during reading"""
+ def __init__(self, exp_size, curr_offs):
+ """Constructor"""
+ Exception.__init__(self, "Unexpected end-of-file: expected file size:%d; current offset:%d" %
+ (exp_size, curr_offs))
+
+class XidLengthCsvError(Exception):
+ """Message Xid length mismatch between record and CSV file"""
+ def __init__(self, tnum, exp_xid_len, actual_msg_len):
+ """Constructor"""
+ Exception.__init__(self, "[CSV %d] Message XID mismatch: expected %d; found %d" %
+ (tnum, exp_xid_len, actual_msg_len))
+
+class XidSizeError(Exception):
+ """Error class for Xid size mismatch"""
+ def __init__(self, exp_size, act_size, xid_str):
+ """Constructor"""
+ Exception.__init__(self, "Inconsistent xid size: expected:%d; actual:%d; xid=\"%s\"" %
+ (exp_size, act_size, xid_str))
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
+
Copied: store/trunk/cpp/tools/qpidstore/jrnl.py (from rev 4457, store/trunk/cpp/tools/jrnl.py)
===================================================================
--- store/trunk/cpp/tools/qpidstore/jrnl.py (rev 0)
+++ store/trunk/cpp/tools/qpidstore/jrnl.py 2011-05-10 15:35:17 UTC (rev 4458)
@@ -0,0 +1,798 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import jerr
+import os.path, sys, xml.parsers.expat
+from struct import pack, unpack, calcsize
+from time import gmtime, strftime
+
+# TODO: Get rid of these! Use jinf instance instead
+DBLK_SIZE = 128
+SBLK_SIZE = 4 * DBLK_SIZE
+
+# TODO - this is messy - find a better way to handle this
+# This is a global, but is set directly by the calling program
+JRNL_FILE_SIZE = None
+
+#== class Utils ======================================================================
+
+class Utils(object):
+ """Class containing utility functions for dealing with the journal"""
+
+ __printchars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\|}~ "
+
+ # The @staticmethod declarations are not supported in RHEL4 (python 2.3.x)
+ # When RHEL4 support ends, restore these declarations and remove the older
+ # staticmethod() declaration.
+
+ #@staticmethod
+ def format_data(dsize, data):
+ """Format binary data for printing"""
+ if data == None:
+ return ""
+ if Utils._is_printable(data):
+ datastr = Utils._split_str(data)
+ else:
+ datastr = Utils._hex_split_str(data)
+ if dsize != len(data):
+ raise jerr.DataSizeError(dsize, len(data), datastr)
+ return "data(%d)=\"%s\" " % (dsize, datastr)
+ format_data = staticmethod(format_data)
+
+ #@staticmethod
+ def format_xid(xid, xidsize=None):
+ """Format binary XID for printing"""
+ if xid == None and xidsize != None:
+ if xidsize > 0:
+ raise jerr.XidSizeError(xidsize, 0, None)
+ return ""
+ if Utils._is_printable(xid):
+ xidstr = Utils._split_str(xid)
+ else:
+ xidstr = Utils._hex_split_str(xid)
+ if xidsize == None:
+ xidsize = len(xid)
+ elif xidsize != len(xid):
+ raise jerr.XidSizeError(xidsize, len(xid), xidstr)
+ return "xid(%d)=\"%s\" " % (xidsize, xidstr)
+ format_xid = staticmethod(format_xid)
+
+ #@staticmethod
+ def inv_str(string):
+ """Perform a binary 1's compliment (invert all bits) on a binary string"""
+ istr = ""
+ for index in range(0, len(string)):
+ istr += chr(~ord(string[index]) & 0xff)
+ return istr
+ inv_str = staticmethod(inv_str)
+
+ #@staticmethod
+ def load(fhandle, klass):
+ """Load a record of class klass from a file"""
+ args = Utils._load_args(fhandle, klass)
+ subclass = klass.discriminate(args)
+ result = subclass(*args) # create instance of record
+ if subclass != klass:
+ result.init(fhandle, *Utils._load_args(fhandle, subclass))
+ result.skip(fhandle)
+ return result
+ load = staticmethod(load)
+
+ #@staticmethod
+ def load_file_data(fhandle, size, data):
+ """Load the data portion of a message from file"""
+ if size == 0:
+ return (data, True)
+ if data == None:
+ loaded = 0
+ else:
+ loaded = len(data)
+ foverflow = fhandle.tell() + size - loaded > JRNL_FILE_SIZE
+ if foverflow:
+ rsize = JRNL_FILE_SIZE - fhandle.tell()
+ else:
+ rsize = size - loaded
+ fbin = fhandle.read(rsize)
+ if data == None:
+ data = unpack("%ds" % (rsize), fbin)[0]
+ else:
+ data = data + unpack("%ds" % (rsize), fbin)[0]
+ return (data, not foverflow)
+ load_file_data = staticmethod(load_file_data)
+
+ #@staticmethod
+ def rem_bytes_in_blk(fhandle, blk_size):
+ """Return the remaining bytes in a block"""
+ foffs = fhandle.tell()
+ return Utils.size_in_bytes_to_blk(foffs, blk_size) - foffs
+ rem_bytes_in_blk = staticmethod(rem_bytes_in_blk)
+
+ #@staticmethod
+ def size_in_blks(size, blk_size):
+ """Return the size in terms of data blocks"""
+ return int((size + blk_size - 1) / blk_size)
+ size_in_blks = staticmethod(size_in_blks)
+
+ #@staticmethod
+ def size_in_bytes_to_blk(size, blk_size):
+ """Return the bytes remaining until the next block boundary"""
+ return Utils.size_in_blks(size, blk_size) * blk_size
+ size_in_bytes_to_blk = staticmethod(size_in_bytes_to_blk)
+
+ #@staticmethod
+ def _hex_split_str(in_str, split_size = 50):
+ """Split a hex string into two parts separated by an ellipsis"""
+ if len(in_str) <= split_size:
+ return Utils._hex_str(in_str, 0, len(in_str))
+# if len(in_str) > split_size + 25:
+# return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, 55, 65) + " ... " + \
+# Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+ return Utils._hex_str(in_str, 0, 10) + " ... " + Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+ _hex_split_str = staticmethod(_hex_split_str)
+
+ #@staticmethod
+ def _hex_str(in_str, begin, end):
+ """Return a binary string as a hex string"""
+ hstr = ""
+ for index in range(begin, end):
+ if Utils._is_printable(in_str[index]):
+ hstr += in_str[index]
+ else:
+ hstr += "\\%02x" % ord(in_str[index])
+ return hstr
+ _hex_str = staticmethod(_hex_str)
+
+ #@staticmethod
+ def _is_printable(in_str):
+ """Return True if in_str in printable; False otherwise."""
+ return in_str.strip(Utils.__printchars) == ""
+ _is_printable = staticmethod(_is_printable)
+
+ #@staticmethod
+ def _load_args(fhandle, klass):
+ """Load the arguments from class klass"""
+ size = calcsize(klass.FORMAT)
+ foffs = fhandle.tell(),
+ fbin = fhandle.read(size)
+ if len(fbin) != size:
+ raise jerr.UnexpectedEndOfFileError(size, len(fbin))
+ return foffs + unpack(klass.FORMAT, fbin)
+ _load_args = staticmethod(_load_args)
+
+ #@staticmethod
+ def _split_str(in_str, split_size = 50):
+ """Split a string into two parts separated by an ellipsis if it is longer than split_size"""
+ if len(in_str) < split_size:
+ return in_str
+ return in_str[:25] + " ... " + in_str[-25:]
+ _split_str = staticmethod(_split_str)
+
+
+#== class Hdr =================================================================
+
+class Hdr:
+ """Class representing the journal header records"""
+
+ FORMAT = "=4sBBHQ"
+ HDR_VER = 1
+ OWI_MASK = 0x01
+ BIG_ENDIAN = sys.byteorder == "big"
+ REC_BOUNDARY = DBLK_SIZE
+
+ def __init__(self, foffs, magic, ver, endn, flags, rid):
+ """Constructor"""
+# Sizeable.__init__(self)
+ self.foffs = foffs
+ self.magic = magic
+ self.ver = ver
+ self.endn = endn
+ self.flags = flags
+ self.rid = long(rid)
+
+ def __str__(self):
+ """Return string representation of this header"""
+ if self.empty():
+ return "0x%08x: <empty>" % (self.foffs)
+ if self.magic[-1] == "x":
+ return "0x%08x: [\"%s\"]" % (self.foffs, self.magic)
+ if self.magic[-1] in ["a", "c", "d", "e", "f", "x"]:
+ return "0x%08x: [\"%s\" v=%d e=%d f=0x%04x rid=0x%x]" % (self.foffs, self.magic, self.ver, self.endn,
+ self.flags, self.rid)
+ return "0x%08x: <error, unknown magic \"%s\" (possible overwrite boundary?)>" % (self.foffs, self.magic)
+
+ #@staticmethod
+ def discriminate(args):
+ """Use the last char in the header magic to determine the header type"""
+ return _CLASSES.get(args[1][-1], Hdr)
+ discriminate = staticmethod(discriminate)
+
+ def empty(self):
+ """Return True if this record is empty (ie has a magic of 0x0000"""
+ return self.magic == "\x00"*4
+
+ def encode(self):
+ """Encode the header into a binary string"""
+ return pack(Hdr.FORMAT, self.magic, self.ver, self.endn, self.flags, self.rid)
+
+ def owi(self):
+ """Return the OWI (overwrite indicator) for this header"""
+ return self.flags & self.OWI_MASK != 0
+
+ def skip(self, fhandle):
+ """Read and discard the remainder of this record"""
+ fhandle.read(Utils.rem_bytes_in_blk(fhandle, self.REC_BOUNDARY))
+
+ def check(self):
+ """Check that this record is valid"""
+ if self.empty() or self.magic[:3] != "RHM" or self.magic[3] not in ["a", "c", "d", "e", "f", "x"]:
+ return True
+ if self.magic[-1] != "x":
+ if self.ver != self.HDR_VER:
+ raise jerr.InvalidHeaderVersionError(self.HDR_VER, self.ver)
+ if bool(self.endn) != self.BIG_ENDIAN:
+ raise jerr.EndianMismatchError(self.BIG_ENDIAN)
+ return False
+
+
+#== class FileHdr =============================================================
+
+class FileHdr(Hdr):
+ """Class for file headers, found at the beginning of journal files"""
+
+ FORMAT = "=2H4x3Q"
+ REC_BOUNDARY = SBLK_SIZE
+
+ def __str__(self):
+ """Return a string representation of the this FileHdr instance"""
+ return "%s fid=%d lid=%d fro=0x%08x t=%s" % (Hdr.__str__(self), self.fid, self.lid, self.fro,
+ self.timestamp_str())
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(FileHdr.FORMAT, self.fid, self.lid, self.fro, self.time_sec, self.time_ns)
+
+ def init(self, fhandle, foffs, fid, lid, fro, time_sec, time_ns):
+ """Initialize this instance to known values"""
+ self.fid = fid
+ self.lid = lid
+ self.fro = fro
+ self.time_sec = time_sec
+ self.time_ns = time_ns
+
+ def timestamp(self):
+ """Get the timestamp of this record as a tuple (secs, nsecs)"""
+ return (self.time_sec, self.time_ns)
+
+ def timestamp_str(self):
+ """Get the timestamp of this record in string format"""
+ time = gmtime(self.time_sec)
+ fstr = "%%a %%b %%d %%H:%%M:%%S.%09d %%Y" % (self.time_ns)
+ return strftime(fstr, time)
+
+
+#== class DeqRec ==============================================================
+
+class DeqRec(Hdr):
+ """Class for a dequeue record"""
+
+ FORMAT = "=QQ"
+
+ def __str__(self):
+ """Return a string representation of the this DeqRec instance"""
+ return "%s %sdrid=0x%x" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize), self.deq_rid)
+
+ def init(self, fhandle, foffs, deq_rid, xidsize):
+ """Initialize this instance to known values"""
+ self.deq_rid = deq_rid
+ self.xidsize = xidsize
+ self.xid = None
+ self.deq_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(DeqRec.FORMAT, self.deq_rid, self.xidsize)
+ if self.xidsize > 0:
+ fmt = "%ds" % (self.xidsize)
+ buf += pack(fmt, self.xid)
+ buf += self.deq_tail.encode()
+ return buf
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
+ if self.xidsize == 0:
+ self.xid_complete = True
+ self.tail_complete = True
+ else:
+ if not self.xid_complete:
+ (self.xid, self.xid_complete) = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.deq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.deq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.deq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
+ return self.xid_complete and self.tail_complete
+
+
+#== class TxnRec ==============================================================
+
+class TxnRec(Hdr):
+ """Class for a transaction commit/abort record"""
+
+ FORMAT = "=Q"
+
+ def __str__(self):
+ """Return a string representation of the this TxnRec instance"""
+ return "%s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize))
+
+ def init(self, fhandle, foffs, xidsize):
+ """Initialize this instance to known values"""
+ self.xidsize = xidsize
+ self.xid = None
+ self.tx_tail = None
+ self.xid_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return Hdr.encode(self) + pack(TxnRec.FORMAT, self.xidsize) + pack("%ds" % self.xidsize, self.xid) + \
+ self.tx_tail.encode()
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
+ if not self.xid_complete:
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.tx_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.tx_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.tx_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
+ return self.xid_complete and self.tail_complete
+
+
+#== class EnqRec ==============================================================
+
+class EnqRec(Hdr):
+ """Class for a enqueue record"""
+
+ FORMAT = "=QQ"
+ TRANSIENT_MASK = 0x10
+ EXTERN_MASK = 0x20
+
+ def __str__(self):
+ """Return a string representation of the this EnqRec instance"""
+ return "%s %s%s %s %s" % (Hdr.__str__(self), Utils.format_xid(self.xid, self.xidsize),
+ Utils.format_data(self.dsize, self.data), self.enq_tail, self.print_flags())
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ buf = Hdr.encode(self) + pack(EnqRec.FORMAT, self.xidsize, self.dsize)
+ if self.xidsize > 0:
+ buf += pack("%ds" % self.xidsize, self.xid)
+ if self.dsize > 0:
+ buf += pack("%ds" % self.dsize, self.data)
+ if self.xidsize > 0 or self.dsize > 0:
+ buf += self.enq_tail.encode()
+ return buf
+
+ def init(self, fhandle, foffs, xidsize, dsize):
+ """Initialize this instance to known values"""
+ self.xidsize = xidsize
+ self.dsize = dsize
+ self.transient = self.flags & self.TRANSIENT_MASK > 0
+ self.extern = self.flags & self.EXTERN_MASK > 0
+ self.xid = None
+ self.data = None
+ self.enq_tail = None
+ self.xid_complete = False
+ self.data_complete = False
+ self.tail_complete = False
+ self.tail_bin = None
+ self.tail_offs = 0
+ self.load(fhandle)
+
+ def load(self, fhandle):
+ """Load the remainder of this record (after the header has been loaded"""
+ if not self.xid_complete:
+ ret = Utils.load_file_data(fhandle, self.xidsize, self.xid)
+ self.xid = ret[0]
+ self.xid_complete = ret[1]
+ if self.xid_complete and not self.data_complete:
+ if self.extern:
+ self.data_complete = True
+ else:
+ ret = Utils.load_file_data(fhandle, self.dsize, self.data)
+ self.data = ret[0]
+ self.data_complete = ret[1]
+ if self.data_complete and not self.tail_complete:
+ ret = Utils.load_file_data(fhandle, calcsize(RecTail.FORMAT), self.tail_bin)
+ self.tail_bin = ret[0]
+ if ret[1]:
+ self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.FORMAT, self.tail_bin))
+ magic_err = self.enq_tail.magic_inv != Utils.inv_str(self.magic)
+ rid_err = self.enq_tail.rid != self.rid
+ if magic_err or rid_err:
+ raise jerr.InvalidRecordTailError(magic_err, rid_err, self)
+ self.skip(fhandle)
+ self.tail_complete = ret[1]
+ return self.complete()
+
+ def complete(self):
+ """Returns True if the entire record is loaded, False otherwise"""
+ return self.xid_complete and self.data_complete and self.tail_complete
+
+ def print_flags(self):
+ """Utility function to decode the flags field in the header and print a string representation"""
+ fstr = ""
+ if self.transient:
+ fstr = "*TRANSIENT"
+ if self.extern:
+ if len(fstr) > 0:
+ fstr += ",EXTERNAL"
+ else:
+ fstr = "*EXTERNAL"
+ if len(fstr) > 0:
+ fstr += "*"
+ return fstr
+
+
+#== class RecTail =============================================================
+
+class RecTail:
+ """Class for a record tail - for all records where either an XID or data separate the header from the end of the
+ record"""
+
+ FORMAT = "=4sQ"
+
+ def __init__(self, foffs, magic_inv, rid):
+ """Initialize this instance to known values"""
+ self.foffs = foffs
+ self.magic_inv = magic_inv
+ self.rid = long(rid)
+
+ def __str__(self):
+ """Return a string representation of the this RecTail instance"""
+ magic = Utils.inv_str(self.magic_inv)
+ return "[\"%s\" rid=0x%x]" % (magic, self.rid)
+
+ def encode(self):
+ """Encode this class into a binary string"""
+ return pack(RecTail.FORMAT, self.magic_inv, self.rid)
+
+
+#== class JrnlInfo ============================================================
+
+class JrnlInfo(object):
+ """
+ This object reads and writes journal information files (<basename>.jinf). Methods are provided
+ to read a file, query its properties and reset just those properties necessary for normalizing
+ and resizing a journal.
+
+ Normalizing: resetting the directory and/or base filename to different values. This is necessary
+ if a set of journal files is copied from one location to another before being restored, as the
+ value of the path in the file no longer matches the actual path.
+
+ Resizing: If the journal geometry parameters (size and number of journal files) changes, then the
+ .jinf file must reflect these changes, as this file is the source of information for journal
+ recovery.
+
+ NOTE: Data size vs File size: There are methods which return the data size and file size of the
+ journal files.
+
+ +-------------+--------------------/ /----------+
+ | File header | File data |
+ +-------------+--------------------/ /----------+
+ | | |
+ | |<---------- Data size ---------->|
+ |<------------------ File Size ---------------->|
+
+ Data size: The size of the data content of the journal, ie that part which stores the data records.
+
+ File size: The actual disk size of the journal including data and the file header which precedes the
+ data.
+
+ The file header is fixed to 1 sblk, so file size = jrnl size + sblk size.
+ """
+
+ def __init__(self, jdir, bfn = "JournalData"):
+ """Constructor"""
+ self.__jdir = jdir
+ self.__bfn = bfn
+ self.__jinf_dict = {}
+ self._read_jinf()
+
+ def __str__(self):
+ """Create a string containing all of the journal info contained in the jinf file"""
+ ostr = "Journal info file %s:\n" % os.path.join(self.__jdir, "%s.jinf" % self.__bfn)
+ for key, val in self.__jinf_dict.iteritems():
+ ostr += " %s = %s\n" % (key, val)
+ return ostr
+
+ def normalize(self, jdir = None, bfn = None):
+ """Normalize the directory (ie reset the directory path to match the actual current location) for this
+ jinf file"""
+ if jdir == None:
+ self.__jinf_dict["directory"] = self.__jdir
+ else:
+ self.__jdir = jdir
+ self.__jinf_dict["directory"] = jdir
+ if bfn != None:
+ self.__bfn = bfn
+ self.__jinf_dict["base_filename"] = bfn
+
+ def resize(self, num_jrnl_files = None, jrnl_file_size = None):
+ """Reset the journal size information to allow for resizing the journal"""
+ if num_jrnl_files != None:
+ self.__jinf_dict["number_jrnl_files"] = num_jrnl_files
+ if jrnl_file_size != None:
+ self.__jinf_dict["jrnl_file_size_sblks"] = jrnl_file_size * self.get_jrnl_dblk_size_bytes()
+
+ def write(self, jdir = None, bfn = None):
+ """Write the .jinf file"""
+ self.normalize(jdir, bfn)
+ if not os.path.exists(self.get_jrnl_dir()):
+ os.makedirs(self.get_jrnl_dir())
+ fhandle = open(os.path.join(self.get_jrnl_dir(), "%s.jinf" % self.get_jrnl_base_name()), "w")
+ fhandle.write("<?xml version=\"1.0\" ?>\n")
+ fhandle.write("<jrnl>\n")
+ fhandle.write(" <journal_version value=\"%d\" />\n" % self.get_jrnl_version())
+ fhandle.write(" <journal_id>\n")
+ fhandle.write(" <id_string value=\"%s\" />\n" % self.get_jrnl_id())
+ fhandle.write(" <directory value=\"%s\" />\n" % self.get_jrnl_dir())
+ fhandle.write(" <base_filename value=\"%s\" />\n" % self.get_jrnl_base_name())
+ fhandle.write(" </journal_id>\n")
+ fhandle.write(" <creation_time>\n")
+ fhandle.write(" <seconds value=\"%d\" />\n" % self.get_creation_time()[0])
+ fhandle.write(" <nanoseconds value=\"%d\" />\n" % self.get_creation_time()[1])
+ fhandle.write(" <string value=\"%s\" />\n" % self.get_creation_time_str())
+ fhandle.write(" </creation_time>\n")
+ fhandle.write(" <journal_file_geometry>\n")
+ fhandle.write(" <number_jrnl_files value=\"%d\" />\n" % self.get_num_jrnl_files())
+ fhandle.write(" <auto_expand value=\"%s\" />\n" % str.lower(str(self.get_auto_expand())))
+ fhandle.write(" <jrnl_file_size_sblks value=\"%d\" />\n" % self.get_jrnl_data_size_sblks())
+ fhandle.write(" <JRNL_SBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_sblk_size_dblks())
+ fhandle.write(" <JRNL_DBLK_SIZE value=\"%d\" />\n" % self.get_jrnl_dblk_size_bytes())
+ fhandle.write(" </journal_file_geometry>\n")
+ fhandle.write(" <cache_geometry>\n")
+ fhandle.write(" <wcache_pgsize_sblks value=\"%d\" />\n" % self.get_wr_buf_pg_size_sblks())
+ fhandle.write(" <wcache_num_pages value=\"%d\" />\n" % self.get_num_wr_buf_pgs())
+ fhandle.write(" <JRNL_RMGR_PAGE_SIZE value=\"%d\" />\n" % self.get_rd_buf_pg_size_sblks())
+ fhandle.write(" <JRNL_RMGR_PAGES value=\"%d\" />\n" % self.get_num_rd_buf_pgs())
+ fhandle.write(" </cache_geometry>\n")
+ fhandle.write("</jrnl>\n")
+ fhandle.close()
+
+ # Journal ID
+
+ def get_jrnl_version(self):
+ """Get the journal version"""
+ return self.__jinf_dict["journal_version"]
+
+ def get_jrnl_id(self):
+ """Get the journal id"""
+ return self.__jinf_dict["id_string"]
+
+ def get_current_dir(self):
+ """Get the current directory of the store (as opposed to that value saved in the .jinf file)"""
+ return self.__jdir
+
+ def get_jrnl_dir(self):
+ """Get the journal directory stored in the .jinf file"""
+ return self.__jinf_dict["directory"]
+
+ def get_jrnl_base_name(self):
+ """Get the base filename - that string used to name the journal files <basefilename>-nnnn.jdat and
+ <basefilename>.jinf"""
+ return self.__jinf_dict["base_filename"]
+
+ # Journal creation time
+
+ def get_creation_time(self):
+ """Get journal creation time as a tuple (secs, nsecs)"""
+ return (self.__jinf_dict["seconds"], self.__jinf_dict["nanoseconds"])
+
+ def get_creation_time_str(self):
+ """Get journal creation time as a string"""
+ return self.__jinf_dict["string"]
+
+ # --- Files and geometry ---
+
+ def get_num_jrnl_files(self):
+ """Get number of data files in the journal"""
+ return self.__jinf_dict["number_jrnl_files"]
+
+ def get_auto_expand(self):
+ """Return True if auto-expand is enabled; False otherwise"""
+ return self.__jinf_dict["auto_expand"]
+
+ def get_jrnl_sblk_size_dblks(self):
+ """Get the journal softblock size in dblks"""
+ return self.__jinf_dict["JRNL_SBLK_SIZE"]
+
+ def get_jrnl_sblk_size_bytes(self):
+ """Get the journal softblock size in bytes"""
+ return self.get_jrnl_sblk_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_jrnl_dblk_size_bytes(self):
+ """Get the journal datablock size in bytes"""
+ return self.__jinf_dict["JRNL_DBLK_SIZE"]
+
+ def get_jrnl_data_size_sblks(self):
+ """Get the data capacity (excluding the file headers) for one journal file in softblocks"""
+ return self.__jinf_dict["jrnl_file_size_sblks"]
+
+ def get_jrnl_data_size_dblks(self):
+ """Get the data capacity (excluding the file headers) for one journal file in datablocks"""
+ return self.get_jrnl_data_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_jrnl_data_size_bytes(self):
+ """Get the data capacity (excluding the file headers) for one journal file in bytes"""
+ return self.get_jrnl_data_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_jrnl_file_size_sblks(self):
+ """Get the size of one journal file on disk (including the file headers) in softblocks"""
+ return self.get_jrnl_data_size_sblks() + 1
+
+ def get_jrnl_file_size_dblks(self):
+ """Get the size of one journal file on disk (including the file headers) in datablocks"""
+ return self.get_jrnl_file_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_jrnl_file_size_bytes(self):
+ """Get the size of one journal file on disk (including the file headers) in bytes"""
+ return self.get_jrnl_file_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_tot_jrnl_data_size_sblks(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ softblocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+
+ def get_tot_jrnl_data_size_dblks(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ datablocks"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_dblks()
+
+ def get_tot_jrnl_data_size_bytes(self):
+ """Get the size of the entire jouranl's data capacity (excluding the file headers) for all files together in
+ bytes"""
+ return self.get_num_jrnl_files() * self.get_jrnl_data_size_bytes()
+
+ # Read and write buffers
+
+ def get_wr_buf_pg_size_sblks(self):
+ """Get the size of the write buffer pages in softblocks"""
+ return self.__jinf_dict["wcache_pgsize_sblks"]
+
+ def get_wr_buf_pg_size_dblks(self):
+ """Get the size of the write buffer pages in datablocks"""
+ return self.get_wr_buf_pg_size_sblks() * self.get_jrnl_sblk_size_dblks()
+
+ def get_wr_buf_pg_size_bytes(self):
+ """Get the size of the write buffer pages in bytes"""
+ return self.get_wr_buf_pg_size_dblks() * self.get_jrnl_dblk_size_bytes()
+
+ def get_num_wr_buf_pgs(self):
+ """Get the number of write buffer pages"""
+ return self.__jinf_dict["wcache_num_pages"]
+
+ def get_rd_buf_pg_size_sblks(self):
+ """Get the size of the read buffer pages in softblocks"""
+ return self.__jinf_dict["JRNL_RMGR_PAGE_SIZE"]
+
+ def get_rd_buf_pg_size_dblks(self):
+ """Get the size of the read buffer pages in datablocks"""
+ return self.get_rd_buf_pg_size_sblks * self.get_jrnl_sblk_size_dblks()
+
+ def get_rd_buf_pg_size_bytes(self):
+ """Get the size of the read buffer pages in bytes"""
+ return self.get_rd_buf_pg_size_dblks * self.get_jrnl_dblk_size_bytes()
+
+ def get_num_rd_buf_pgs(self):
+ """Get the number of read buffer pages"""
+ return self.__jinf_dict["JRNL_RMGR_PAGES"]
+
+ def _read_jinf(self):
+ """Read and initialize this instance from an existing jinf file located at the directory named in the
+ constructor - called by the constructor"""
+ fhandle = open(os.path.join(self.__jdir, "%s.jinf" % self.__bfn), "r")
+ parser = xml.parsers.expat.ParserCreate()
+ parser.StartElementHandler = self._handle_xml_start_elt
+ parser.CharacterDataHandler = self._handle_xml_char_data
+ parser.EndElementHandler = self._handle_xml_end_elt
+ parser.ParseFile(fhandle)
+ fhandle.close()
+
+ def _handle_xml_start_elt(self, name, attrs):
+ """Callback for handling XML start elements. Used by the XML parser."""
+ # bool values
+ if name == "auto_expand":
+ self.__jinf_dict[name] = attrs["value"] == "true"
+ # long values
+ elif name == "seconds" or \
+ name == "nanoseconds":
+ self.__jinf_dict[name] = long(attrs["value"])
+ # int values
+ elif name == "journal_version" or \
+ name == "number_jrnl_files" or \
+ name == "jrnl_file_size_sblks" or \
+ name == "JRNL_SBLK_SIZE" or \
+ name == "JRNL_DBLK_SIZE" or \
+ name == "wcache_pgsize_sblks" or \
+ name == "wcache_num_pages" or \
+ name == "JRNL_RMGR_PAGE_SIZE" or \
+ name == "JRNL_RMGR_PAGES":
+ self.__jinf_dict[name] = int(attrs["value"])
+ # strings
+ elif "value" in attrs:
+ self.__jinf_dict[name] = attrs["value"]
+
+ def _handle_xml_char_data(self, data):
+ """Callback for handling character data (ie within <elt>...</elt>). The jinf file does not use this in its
+ data. Used by the XML parser."""
+ pass
+
+ def _handle_xml_end_elt(self, name):
+ """Callback for handling XML end elements. Used by XML parser."""
+ pass
+
+
+#==============================================================================
+
+_CLASSES = {
+ "a": TxnRec,
+ "c": TxnRec,
+ "d": DeqRec,
+ "e": EnqRec,
+ "f": FileHdr
+}
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."
13 years, 7 months
rhmessaging commits: r4457 - store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2011-05-04 08:00:49 -0400 (Wed, 04 May 2011)
New Revision: 4457
Modified:
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Update Javadoc to account for latest changes
Applied patch from Oleksandr Rudyy
Modified: store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-04 09:13:25 UTC (rev 4456)
+++ store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-04 12:00:49 UTC (rev 4457)
@@ -62,19 +62,15 @@
import com.sleepycat.bind.tuple.ByteBinding;
/**
- * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2 Store.
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade V1/V2 Stores to a V4 Store.
*
- * NOTE: No checks are in place to validate that the input is V1.
+ * BDB Environment is checked to detect the existing store version.
*
- * Currently upgrade is fixed from v1 -> v2
- * Only the Queue and Binding databases are migrated all other databases are copied as DB entries.
+ * Only Message Content database entries are copied. Entries of other databases are migrated into v4.
*
- * Improvments:
+ * Improvements:
* - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against new DBs being added.
- * - Add a version value into the store so that a quick check can be performed to perform the upgrades.
- * - A version in the store would allow automated upgrade or later with more available versions interactive upgrade.
- * - Currently only the Queue and Binding DB are processed for upgrade all the other db data is copied between stores.
- * - Add process logging and disable all Store and Qpid logging.
+ *
*/
public class BDBStoreUpgrade
{
13 years, 7 months
rhmessaging commits: r4456 - in store/branches/java/0.5.x-dev: src/main/java/org/apache/qpid/server/store/berkeleydb and 5 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2011-05-04 05:13:25 -0400 (Wed, 04 May 2011)
New Revision: 4456
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
Removed:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
Modified:
store/branches/java/0.5.x-dev/etc/config.xml
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
A signed byte was previously used to store AMQShortString lengths, which resulted in lengths greater than 127 being converted into a negative value upon recovery and assumed to be null. Update the store implementation to use a short for the length to allow encoding the 255 legal values and null.
Applied patch from Oleksandr Rudyy
Modified: store/branches/java/0.5.x-dev/etc/config.xml
===================================================================
--- store/branches/java/0.5.x-dev/etc/config.xml 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/etc/config.xml 2011-05-04 09:13:25 UTC (rev 4456)
@@ -45,7 +45,9 @@
<management>
<enabled>true</enabled>
<jmxport>8999</jmxport>
- <security-enabled>false</security-enabled>
+ <ssl>
+ <enabled>false</enabled>
+ </ssl>
</management>
<advanced>
<filterchain enableExecutorPool="true"/>
@@ -124,7 +126,6 @@
<auto_register>true</auto_register>
</queue>
- <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
Deleted: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,46 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 13:56:56
- * To change this template use File | Settings | File Templates.
- */
-public class AMQShortStringEncoding
-{
- public static AMQShortString readShortString(TupleInput tupleInput)
- {
- int length = (int) tupleInput.readByte();
- if (length < 0)
- {
- return null;
- }
- else
- {
- byte[] stringBytes = new byte[length];
- tupleInput.readFast(stringBytes);
- return new AMQShortString(stringBytes);
- }
-
- }
-
- public static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput)
- {
-
- if (shortString == null)
- {
- tupleOutput.writeByte(-1);
- }
- else
- {
- tupleOutput.writeByte(shortString.length());
- tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
- }
- }
-}
Deleted: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,28 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
-
-
- public AMQShortStringTB()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- return AMQShortStringEncoding.readShortString(tupleInput);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
- }
-
-}
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -53,7 +53,9 @@
import org.apache.qpid.server.store.AbstractMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -88,7 +90,7 @@
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
- private static final int DATABASE_FORMAT_VERSION = 2;
+ public static final int DATABASE_FORMAT_VERSION = 4;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -138,6 +140,13 @@
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
+ // factory for AMQShortStringTupleBinding
+ private AMQShortStringTupleBindingFactory _shortStringTupleBindingFactory;
+
+ // AMQShortString tuple binding object is stateless and can be reused across
+ // different methods
+ private TupleBinding<AMQShortString> _shortStringTupleBinding;
+
Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
/** The data version this store should run with */
@@ -233,6 +242,9 @@
setDatabaseNames(_version);
+ // create AMQShortString tuple binding for database version
+ _shortStringTupleBinding = AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+
if (virtualHost != null)
{
setVirtualHost(virtualHost);
@@ -328,8 +340,9 @@
private void createTupleBindingFactories(int version)
{
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost, _shortStringTupleBinding);
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost, _shortStringTupleBinding);
+ _shortStringTupleBindingFactory = new AMQShortStringTupleBindingFactory(version, _virtualHost);
}
private synchronized void stateTransition(State requiredState, State newState) throws AMQException
@@ -616,10 +629,10 @@
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
+ TupleBinding<Exchange> exchangeBinding = new ExchangeTB(_virtualHost, _shortStringTupleBinding);
exchangeBinding.objectToEntry(exchange, value);
try
{
@@ -643,8 +656,8 @@
public void removeExchange(Exchange exchange) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
try
{
OperationStatus status = _exchangeDb.delete(null, key);
@@ -747,7 +760,7 @@
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB(_virtualHost);
+ TupleBinding<Exchange> binding = new ExchangeTB(_virtualHost, _shortStringTupleBinding);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -863,9 +876,9 @@
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getName(), key);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), key);
+
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
@@ -899,8 +912,8 @@
Long queueId = _queueNameToIdMap.remove(name);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
+
+ _shortStringTupleBinding.objectToEntry(name, key);
try
{
OperationStatus status = _queueDb.delete(null, key);
@@ -927,8 +940,8 @@
AMQQueue getQueue(AMQShortString name) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
+
+ _shortStringTupleBinding.objectToEntry(name, key);
DatabaseEntry value = new DatabaseEntry();
try
{
@@ -960,7 +973,7 @@
AMQShortString name = queue.getName();
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1020,7 +1033,7 @@
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
@@ -1208,7 +1221,7 @@
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1368,7 +1381,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding<MessageMetaData> messageBinding = new MessageMetaDataTB(_shortStringTupleBinding);
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1406,7 +1419,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding<MessageMetaData> messageBinding = new MessageMetaDataTB(_shortStringTupleBinding);
try
{
@@ -1521,6 +1534,11 @@
return _bindingTupleBindingFactory;
}
+ public AMQShortStringTupleBindingFactory getShortStringTupleBindingFactory()
+ {
+ return _shortStringTupleBindingFactory;
+ }
+
//Package getters for the various databases used by the Store
Database getMetaDataDb()
@@ -1647,7 +1665,7 @@
Transaction tx = (Transaction) context.getPayload();
cursor = _deliveryDb.openCursor(tx, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -9,22 +9,25 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class ExchangeTB extends TupleBinding
+public class ExchangeTB extends TupleBinding<Exchange>
{
private static final Logger _log = Logger.getLogger(ExchangeTB.class);
private final VirtualHost _virtualHost;
- public ExchangeTB(VirtualHost virtualHost)
+ private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ public ExchangeTB(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringEncoder)
{
_virtualHost = virtualHost;
+ _shortStringTupleBinding = shortStringEncoder;
}
- public Object entryToObject(TupleInput tupleInput)
+ public Exchange entryToObject(TupleInput tupleInput)
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString typeName = _shortStringTupleBinding.entryToObject(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
@@ -56,13 +59,11 @@
}
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(Exchange exchange, TupleOutput tupleOutput)
{
- Exchange exchange = (Exchange) object;
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(exchange.getType(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
-
tupleOutput.writeBoolean(exchange.isAutoDelete());
}
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -17,25 +17,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.MessageMetaData;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
/**
* Handles the mapping to and from message meta data
*/
-public class MessageMetaDataTB extends TupleBinding
+public class MessageMetaDataTB extends TupleBinding<MessageMetaData>
{
private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
- public Object entryToObject(TupleInput tupleInput)
+ private final TupleBinding<AMQShortString> _shortStringBinding;
+
+ public MessageMetaDataTB(TupleBinding<AMQShortString> shortStringBinding)
{
+ _shortStringBinding = shortStringBinding;
+ }
+
+ public MessageMetaData entryToObject(TupleInput tupleInput)
+ {
try
{
final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
@@ -51,9 +62,8 @@
}
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(MessageMetaData message, TupleOutput tupleOutput)
{
- MessageMetaData message = (MessageMetaData) object;
try
{
writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
@@ -71,8 +81,8 @@
private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
{
- final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString exchange = _shortStringBinding.entryToObject(tupleInput);
+ final AMQShortString routingKey = _shortStringBinding.entryToObject(tupleInput);
final boolean mandatory = tupleInput.readBoolean();
final boolean immediate = tupleInput.readBoolean();
@@ -122,8 +132,8 @@
{
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+ _shortStringBinding.objectToEntry(publishBody.getExchange(), tupleOutput);
+ _shortStringBinding.objectToEntry(publishBody.getRoutingKey(), tupleOutput);
tupleOutput.writeBoolean(publishBody.isMandatory());
tupleOutput.writeBoolean(publishBody.isImmediate());
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,7 +1,5 @@
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.framing.AMQShortString;
@@ -18,42 +16,6 @@
{
}
-
- public QueueEntryKey(byte[] payload)
- {
- final TupleInput ti = new TupleInput(payload);
-
- queueName = AMQShortStringEncoding.readShortString(ti);
-
- messageId = ti.readLong();
-
- }
-
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
- {
- public Object entryToObject(TupleInput tupleInput)
- {
- final QueueEntryKey mk = new QueueEntryKey();
-
-
- mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
- mk.messageId = tupleInput.readLong();
-
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final QueueEntryKey mk = (QueueEntryKey) object;
-
- AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
- tupleOutput.writeLong(mk.messageId);
-
- }
-
-
- }
-
public QueueEntryKey(AMQShortString queueName, long messageId)
{
this.queueName = queueName;
Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * This class is responsible for reading/writing of {@link AMQShortString}
+ * from/into BDB tuple.
+ * <p>
+ * However, it contains a bug and can only read/write correctly
+ * {@link AMQShortString} objects having length less then 128.
+ * <p>
+ * For AMQShortString objects with length greater than 127 characters a read
+ * operation returns null due to writing a string length as a byte which causes
+ * converting values greater then 127 into negative signed byte value.
+ */
+public class AMQShortStringTB_2 extends TupleBinding<AMQShortString>
+{
+
+ /**
+ * Reads short string object from the given tuple input.
+ * <p>
+ * The length of the string is read from a first byte and followed by the
+ * string characters if there is any.
+ * <p>
+ * A null is returned for a string with negative length.
+ *
+ * @param tupleInput
+ * tuple input
+ * @return AMQShortString object or null if tuple contain a negative value
+ * in a first byte.
+ */
+ @Override
+ public AMQShortString entryToObject(TupleInput tupleInput)
+ {
+ int length = (int) tupleInput.readByte();
+ if (length < 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] stringBytes = new byte[length];
+ tupleInput.readFast(stringBytes);
+ return new AMQShortString(stringBytes);
+ }
+ }
+
+ /**
+ * Writes given short string object into given tuple output.
+ * <p>
+ * The string is stored as sequence bytes where first byte contains a string
+ * length and followed by string characters.
+ * <p>
+ * Only strings with length less then 128 can be written correctly with this
+ * method.
+ * <p>
+ * Length values greater then 127 are converted into negative signed byte
+ * values
+ * <p>
+ * Null is written as -1.
+ *
+ * @param object
+ * string to write
+ * @param tupleOutput
+ * output tuple to write string into.
+ */
+ @Override
+ public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
+ {
+ if (object == null)
+ {
+ tupleOutput.writeByte(-1);
+ }
+ else
+ {
+ tupleOutput.writeByte(object.length());
+ tupleOutput.writeFast(object.getBytes(), 0, object.length());
+ }
+ }
+
+}
Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * A correct implementation of AMQShortString Tuple binding to use for
+ * reading/writing of {@link AMQShortString} objects from/into tuple.
+ *
+ */
+public class AMQShortStringTB_4 extends TupleBinding<AMQShortString>
+{
+ /**
+ * Reads an <code>AMQShortString</code> from given <code>TupleInput</code>
+ * <p>
+ * The length of the string is read from 2 first bytes following string
+ * characters.
+ * <p>
+ * String having negative length value is considered a null string.
+ *
+ * @param tupleInput
+ * tuple input
+ * @return AMQShortString
+ */
+ @Override
+ public AMQShortString entryToObject(TupleInput tupleInput)
+ {
+ short length = tupleInput.readShort();
+ if (length < 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] stringBytes = new byte[length];
+ tupleInput.readFast(stringBytes);
+ return new AMQShortString(stringBytes);
+ }
+ }
+
+ /**
+ * Writes given short string object into given tuple output.
+ * <p>
+ * The string is written as sequence of bytes where first 2 bytes contain a
+ * string length and followed by string characters.
+ *
+ * @param shortString
+ * short string
+ * @param tupleOutput
+ * tuple output
+ */
+ @Override
+ public void objectToEntry(AMQShortString shortString, TupleOutput tupleOutput)
+ {
+ if (shortString == null)
+ {
+ tupleOutput.writeShort(-1);
+ }
+ else
+ {
+ tupleOutput.writeShort(shortString.length());
+ tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
+ }
+ }
+
+}
Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+/**
+ * A factory class to construct {@link TupleBinding<AMQShortString>} instance to
+ * use for writing/reading {@link AMQShortString} objects into/from BDB tuple.
+ */
+public class AMQShortStringTupleBindingFactory extends TupleBindingFactory
+{
+
+ /**
+ * Constructs factory for given BDB store version and virtual host
+ *
+ * @param version
+ * BDB store version
+ * @param virtualhost
+ * virtual host
+ */
+ public AMQShortStringTupleBindingFactory(int version, VirtualHost virtualhost)
+ {
+ super(version, virtualhost);
+ }
+
+ /**
+ * Creates {@link TupleBinding<AMQShortString>} instance for given BDB
+ * version.
+ *
+ * @param version
+ * BDB version.
+ * @return TupleBinding<AMQShortString>
+ */
+ public static TupleBinding<AMQShortString> createTupleBinding(int version)
+ {
+ switch (version)
+ {
+ default:
+ case 4:
+ return new AMQShortStringTB_4();
+ case 1:
+ case 2:
+ // old BDB store short string binding
+ return new AMQShortStringTB_2();
+ }
+ }
+
+ @Override
+ public TupleBinding<AMQShortString> getInstance()
+ {
+ return AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+ }
+}
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
+
public class BindingTupleBindingFactory extends TupleBindingFactory
{
- public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+ /**
+ * Holds tuple binding to serialize/de-serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Creates a factory instance for given store version, virtual host and
+ * {@link AMQShortString} tuple binding
+ *
+ * @param version
+ * store version
+ * @param virtualHost
+ * virtual host
+ * @param shortStringTupleBinding
+ * AMQShortString tuple binding
+ */
+ public BindingTupleBindingFactory(int version, VirtualHost virtualhost,
+ TupleBinding<AMQShortString> shortStringTupleBinding)
{
super(version, virtualhost);
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public TupleBinding getInstance()
+ public TupleBinding<BindingKey> getInstance()
{
switch (_version)
{
default:
+ case 4:
case 2:
- return new BindingTuple_2(_virtualhost);
+ return new BindingTuple_2(_virtualhost, _shortStringTupleBinding);
case 1:
- return new BindingTuple_1(_virtualhost);
+ return new BindingTuple_1(_virtualhost, _shortStringTupleBinding);
}
}
}
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,8 +1,6 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -12,46 +10,60 @@
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
-public class BindingTuple_1 extends TupleBinding implements BindingTuple
+public class BindingTuple_1 extends TupleBinding<BindingKey> implements BindingTuple
{
protected static final Logger _log = Logger.getLogger(BindingTuple.class);
protected VirtualHost _virtualhost;
- public BindingTuple_1(VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Constructs instance for given virtual host and AMQShortString {@link TupleBinding}.
+ *
+ * @param virtualHost virtual host
+ * @param shortStringTupleBinding AMQShortString tuple binding
+ */
+ public BindingTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
{
if (virtualHost == null)
{
throw new NullPointerException("Virtualhost cannot be null");
}
+ if (shortStringTupleBinding == null)
+ {
+ throw new NullPointerException("AMQShortString tuple binding cannot be null");
+ }
_virtualhost = virtualHost;
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public Object entryToObject(TupleInput tupleInput)
+ public BindingKey entryToObject(TupleInput tupleInput)
{
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString exchangeName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
return createNewBindingKey(exchangeName, queueName, routingKey);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
{
- BindingKey binding = (BindingKey) object;
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
}
- private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
+ private BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
{
return createNewBindingKey(exchangeName, queueName, routingKey, null);
}
// Addition for Version 2 of this table
- protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
+ protected BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
AMQShortString routingKey, FieldTable arguments)
{
return new BindingKey(exchangeName, queueName, routingKey, arguments);
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -2,27 +2,27 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
{
- public BindingTuple_2(VirtualHost virtualHost)
+ public BindingTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
{
- super(virtualHost);
+ super(virtualHost, shortStringTupleBinding);
}
- public Object entryToObject(TupleInput tupleInput)
+ public BindingKey entryToObject(TupleInput tupleInput)
{
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString exchangeName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
FieldTable arguments;
@@ -40,13 +40,11 @@
return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
{
- BindingKey binding = (BindingKey) object;
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
// Addition for Version 2 of this table
FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * TupleBinding implementation to store/load {@link QueueEntryKey} in/from DBD store.
+ */
+public class QueueEntryKeyTupleBinding extends TupleBinding<QueueEntryKey>
+{
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Constructs instance for given short string tuple binding
+ *
+ * @param shortStringTupleBinding
+ * short string tuple binding
+ */
+ public QueueEntryKeyTupleBinding(TupleBinding<AMQShortString> shortStringTupleBinding)
+ {
+ _shortStringTupleBinding = shortStringTupleBinding;
+ }
+
+ @Override
+ public QueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ final QueueEntryKey mk = new QueueEntryKey();
+ mk.queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ mk.messageId = tupleInput.readLong();
+ return mk;
+ }
+
+ @Override
+ public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ _shortStringTupleBinding.objectToEntry(mk.queueName, tupleOutput);
+ tupleOutput.writeLong(mk.messageId);
+ }
+
+}
\ No newline at end of file
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+
import com.sleepycat.bind.tuple.TupleBinding;
public class QueueTupleBindingFactory extends TupleBindingFactory
{
- public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize/de-serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Creates a factory instance for given store version, virtual host and
+ * {@link AMQShortString} tuple binding
+ *
+ * @param version
+ * store version
+ * @param virtualHost
+ * virtual host
+ * @param shortStringTupleBinding
+ * AMQShortString tuple binding
+ */
+ public QueueTupleBindingFactory(int version, VirtualHost virtualHost,
+ TupleBinding<AMQShortString> shortStringTupleBinding)
{
- super(version,virtualHost);
+ super(version, virtualHost);
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public TupleBinding getInstance()
+ public TupleBinding<AMQQueue> getInstance()
{
switch (_version)
{
default:
+ case 4:
case 2:
- return new QueueTuple_2(_virtualhost);
+ return new QueueTuple_2(_virtualhost, _shortStringTupleBinding);
case 1:
- return new QueueTuple_1(_virtualhost);
+ return new QueueTuple_1(_virtualhost, _shortStringTupleBinding);
}
}
}
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -26,38 +26,44 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class QueueTuple_1 extends TupleBinding implements QueueTuple
+public class QueueTuple_1 extends TupleBinding<AMQQueue> implements QueueTuple
{
protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
protected final VirtualHost _virtualHost;
- public QueueTuple_1(VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ public QueueTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
{
if (virtualHost == null)
{
throw new NullPointerException("Virtualhost cannot be null");
+ }if (shortStringTupleBinding == null)
+ {
+ throw new NullPointerException("AMQShortString tuple binding cannot be null");
}
+ _shortStringTupleBinding = shortStringTupleBinding;
_virtualHost = virtualHost;
}
- public Object entryToObject(TupleInput tupleInput)
+ public AMQQueue entryToObject(TupleInput tupleInput)
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
return createNewQueue(name, owner);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
-
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
}
// Addition for Version 2 of this table
@@ -66,13 +72,13 @@
//no-op
}
- protected Object createNewQueue(AMQShortString name, AMQShortString owner)
+ protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner)
{
return createNewQueue(name, owner, null);
}
// Addition for Version 2 of this table
- protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
+ protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
{
try
{
Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,10 +20,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@@ -31,17 +31,17 @@
{
protected FieldTable _arguments;
- public QueueTuple_2(VirtualHost virtualHost)
+ public QueueTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
{
- super(virtualHost);
+ super(virtualHost, shortStringTupleBinding);
}
- public Object entryToObject(TupleInput tupleInput)
+ public AMQQueue entryToObject(TupleInput tupleInput)
{
try
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
// Addition for Version 2 of this table
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
@@ -55,12 +55,10 @@
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
-
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
// Addition for Version 2 of this table
FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
}
Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -19,18 +19,14 @@
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import junit.framework.TestSuite;
-import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -42,18 +38,14 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
-import java.util.LinkedList;
import java.util.List;
public class BDBStoreTest extends BDBVMTestCase
{
- private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
private BDBMessageStore _store;
private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
@@ -61,7 +53,6 @@
private StoreContext _storeContext = new StoreContext();
private VirtualHost _virtualHost;
- private TransactionalContext _txnContext;
private static final AMQShortString QUEUE1 = new AMQShortString("queue1");
private static final AMQShortString ME = new AMQShortString("me");
private static final AMQShortString MYEXCHANGE = new AMQShortString("myexchange");
@@ -70,7 +61,6 @@
private static final AMQShortString HIM = new AMQShortString("him");
private static final AMQShortString EXCHANGE1 = new AMQShortString("exchange1");
- private static volatile int _loops;
File BDB_DIR = new File(STORE_LOCATION);
public void setUp() throws Exception
@@ -101,7 +91,6 @@
_store.setVirtualHost(_virtualHost);
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
private void reload() throws Exception
@@ -338,6 +327,9 @@
public void testTranCommit() throws Exception
{
+ List<Long> previousIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Messages are still present", previousIds.isEmpty());
+
MessagePublishInfo pubBody = createPublishBody();
BasicContentHeaderProperties props = createContentHeaderProperties();
String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -363,8 +355,30 @@
val = enqueuedIds.get(1);
Assert.assertEquals("Second Message is incorrect", 21L, val.longValue());
+ cleanQueue(queue, enqueuedIds);
+
}
+ /**
+ * A helper method to clean given queue form messages with given IDs.
+ *
+ * @param queue
+ * queue to clean
+ * @param enqueuedIds
+ * messages to dequeue
+ * @throws AMQException
+ */
+ protected void cleanQueue(AMQQueue queue, List<Long> enqueuedIds) throws AMQException
+ {
+ // clean queue
+ _store.beginTran(_storeContext);
+ for (Long messageId : enqueuedIds)
+ {
+ _store.dequeueMessage(_storeContext, queue, messageId);
+ }
+ _store.commitTran(_storeContext);
+ }
+
public void testTranRollback1() throws Exception
{
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -404,6 +418,8 @@
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 31L, val.longValue());
+ cleanQueue(queue, enqueuedIds);
+
}
public void testTranRollback2() throws Exception
@@ -440,6 +456,8 @@
Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
val = enqueuedIds.get(1);
Assert.assertEquals("Second Message is incorrect", 32L, val.longValue());
+
+ cleanQueue(queue, enqueuedIds);
}
public void testRecovery() throws Exception
Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
@@ -48,16 +51,15 @@
final String BDBHome = System.getProperty("BDB_HOME");
final File _configFile = new File(BDBHome, "etc/config.xml");
- private String VIRTUALHOST = "test";
-
private static final String VERSION_1 = "1";
- private static final String VERSION_2 = "2";
+ private static final String VERSION_4 = "4";
private String _topic = "MyDurableSubscriptionTestTopic";
String _fromDir = System.getProperty("QPID_WORK") + "/version1Store";
- String _toDir = System.getProperty("QPID_WORK") + "/version2Store";
- String _toDirTwice = System.getProperty("QPID_WORK") + "/version2StoreUpgradeTwice";
+ String _fromDir2 = System.getProperty("QPID_WORK") + "/version2Store";
+ String _toDir = System.getProperty("QPID_WORK") + "/version4Store";
+ String _toDirTwice = System.getProperty("QPID_WORK") + "/version4StoreUpgradeTwice";
public void setUp() throws IOException
{
@@ -84,44 +86,102 @@
FileUtils.delete(directory, true);
}
+ directory = new File(_fromDir2);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
}
- public void testMultipleUpgrades() throws Exception
+ public void testMultipleUpgradesFromVersionOne() throws Exception
{
+ assertBrokerUpgrade(1, _fromDir);
+ }
+
+ public void testMultipleUpgradesFromVersionTwo() throws Exception
+ {
+ assertBrokerUpgrade(2, _fromDir2);
+ }
+
+ public void testUpgradeFromLatestVersion() throws Exception
+ {
+ startBroker(1, Integer.toString(BDBMessageStore.DATABASE_FORMAT_VERSION));
+ stopBroker(1);
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(BDBMessageStore.DATABASE_FORMAT_VERSION);
+ fail("Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ assertEquals("Store on disk already upgraded to latest version "
+ + BDBMessageStore.DATABASE_FORMAT_VERSION, e.getMessage());
+ }
+ }
+
+ /**
+ * Tests broker upgrade from given version with store located by given path
+ *
+ * @param version
+ * store version
+ * @param storePath
+ * store path
+ * @throws Exception
+ */
+ protected void assertBrokerUpgrade(int version, String storePath) throws Exception
+ {
String broker = "vm://:1";
- startBroker(1, VERSION_1);
+ startBroker(1, Integer.toString(version));
- //Ensure msg were transitioned to new broker
+ // Ensure msg were transitioned to new broker
sendAndCheckDurableSubscriber(broker, true, true, 5, null);
- //Reset the Selector Pattern
+ // Reset the Selector Pattern
new DurableSubscriber(broker, _topic, "odd=true").close();
stopBroker(1);
- upgradeBroker();
+ upgradeBroker(storePath);
try
{
- new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(1);
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(version);
fail("Second Upgrade Succeeded");
}
catch (Exception e)
{
System.err.println("Showing stack trace. expecting Unable to load BDBStore error");
e.printStackTrace();
- assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
- e.getMessage().contains("Unable to load BDBStore as version 1. Store on disk contains version 2 data"));
+ String expectedMessage = "Unable to load BDBStore as version " + version
+ + ". Store on disk contains version 4 data";
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(), e.getMessage().contains(expectedMessage));
}
}
- public void testDurababilitySelectors() throws Exception
+ public void testDurababilitySelectorsForUpgradeFromVersionOne() throws Exception
{
+ assertDurababilitySelectorsForUpgrade(1, _fromDir);
+ }
+
+ public void testDurababilitySelectorsForUpgradeFromVersionTwo() throws Exception
+ {
+ assertDurababilitySelectorsForUpgrade(2, _fromDir2);
+ }
+
+ /**
+ * Tests broker upgrade for the storage containing data for durable subscription.
+ *
+ * @param version store version
+ * @param storePath store location
+ * @throws Exception
+ */
+ protected void assertDurababilitySelectorsForUpgrade(int version, String storePath) throws Exception
+ {
String broker = "vm://:1";
- startBroker(1, VERSION_1);
+ startBroker(1, Integer.toString(version));
new DurableSubscriber(broker, _topic, null).close();
@@ -133,26 +193,16 @@
stopBroker(1);
- upgradeBroker();
+ upgradeBroker(storePath);
broker = "vm://:2";
- startBroker(2, VERSION_2);
+ startBroker(2, VERSION_4);
//Ensure msg were transitioned to new broker
- sendAndCheckDurableSubscriber(broker, false, false, 5, null);
+ sendAndCheckDurableSubscriber(broker, false, true, 5, null);
- //Reset the Selector Pattern
- new DurableSubscriber(broker, _topic, "odd=true").close();
-
stopBroker(2);
-
- startBroker(2, VERSION_2);
-
- ///* This test is currently broken due to QPID-1275
- //Ensure that the selector was preseved on restart and caused all msgs to be removed.
- sendAndCheckDurableSubscriber(broker, false, false, 0, null);
- stopBroker(2);
}
public void testDurabability() throws Exception
@@ -172,11 +222,31 @@
stopBroker(1);
}
- private void upgradeBroker() throws Exception
+ public void testStoreVersionDetection() throws Exception
{
- new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(1);
+ int[] versions = {1, 2, 4};
+ for (int i = 0; i < versions.length; i++)
+ {
+ assertBDBStoreVersionDetection(1);
+ }
}
+
+ protected void assertBDBStoreVersionDetection(int version) throws Exception
+ {
+ startBroker(1, Integer.toString(version));
+ stopBroker(1);
+
+ File storeFolder = new File(System.getProperty("QPID_WORK") + "/version" + version + "Store");
+ int detectedVersion = BDBStoreUpgrade.getStoreVersion(storeFolder);
+ assertEquals("failed to detect store version for store " + version, version, detectedVersion);
+ }
+
+ private void upgradeBroker(String fromDir) throws Exception
+ {
+ BDBStoreUpgrade.upgrade(new File(fromDir), _toDir, null, false, true);
+ }
+
private void stopBroker(int port)
{
TransportConnection.killVMBroker(port);
@@ -199,6 +269,7 @@
System.getProperty("QPID_WORK")+ "/version" + version + "Store");
testVirtualhost.setProperty("store.version", version);
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link ExchangeTB}
+ */
+public class ExchangeTBTest extends TestCase
+{
+ // tested tuple binding
+ private ExchangeTB _exchangeTupleBinding;
+
+ // test exchange
+ private Exchange _exchange;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ ExchangeTB exchangeTupleBinding = new ExchangeTB(virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ _exchangeTupleBinding = exchangeTupleBinding;
+ _exchange = new DirectExchange();
+ _exchange.initialise(virtualHost, new AMQShortString("test echange"), true, 0, true);
+ }
+
+ /**
+ * Tests {@link Exchange} object serialization/de-serialization with {@link ExchangeTB}.
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ _exchangeTupleBinding.objectToEntry(_exchange, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ Exchange storedExchange = _exchangeTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedExchange);
+
+ assertEquals(_exchange.getName(), storedExchange.getName());
+ assertEquals(_exchange.getType(), storedExchange.getType());
+ assertEquals(_exchange.isAutoDelete(), storedExchange.isAutoDelete());
+ }
+
+}
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for {@link MessageMetaDataTB}
+ */
+public class MessageMetaDataTBTest extends TestCase
+{
+ // tested tuple binding
+ private MessageMetaDataTB _messageMetaDataTupleBinding;
+
+ // tested object
+ private MessageMetaData _messageMetaData;
+
+ public void setUp() throws Exception
+ {
+ _messageMetaDataTupleBinding = new MessageMetaDataTB(AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+ @Override
+ public void setExchange(AMQShortString exchange)
+ {
+ // ignore
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return true;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("test routine key");
+ }
+
+ @Override
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("test exchange");
+ }
+ };
+
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setAppId("test app");
+ properties.setClusterId("test claster");
+ properties.setContentType("text");
+ properties.setCorrelationId("test correlation id");
+ properties.setDeliveryMode((byte)1);
+ properties.setEncoding("UTF-8");
+ properties.setExpiration(2);
+ properties.setMessageId("test message id");
+ properties.setPriority((byte)3);
+ properties.setReplyTo("test reply to");
+ properties.setType("test type");
+ properties.setUserId("test user id");
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(properties, BasicConsumeBodyImpl.CLASS_ID );
+ _messageMetaData = new MessageMetaData(publishBody, contentHeaderBody, 0);
+ }
+
+ /**
+ * Tests {@link MessageMetaData} object serialization/de-serialization with {@link MessageMetaDataTB}
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ _messageMetaDataTupleBinding.objectToEntry(_messageMetaData, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ MessageMetaData storedMessageMetaData = _messageMetaDataTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedMessageMetaData);
+
+ MessagePublishInfo expectedInfo = _messageMetaData.getMessagePublishInfo();
+ MessagePublishInfo storedInfo = storedMessageMetaData.getMessagePublishInfo();
+
+ assertEquals(expectedInfo.getExchange(), storedInfo.getExchange());
+ assertEquals(expectedInfo.getRoutingKey(), storedInfo.getRoutingKey());
+ assertEquals(expectedInfo.isImmediate(), storedInfo.isImmediate());
+ assertEquals(expectedInfo.isMandatory(), storedInfo.isMandatory());
+
+ ContentHeaderBody expectedBody = _messageMetaData.getContentHeaderBody();
+ ContentHeaderBody storedBody = storedMessageMetaData.getContentHeaderBody();
+
+ assertEquals(expectedBody.classId, storedBody.classId);
+ assertEquals(expectedBody.weight, storedBody.weight);
+ assertEquals(expectedBody.bodySize, storedBody.bodySize);
+
+ BasicContentHeaderProperties expectedBodyProperties = (BasicContentHeaderProperties)expectedBody.getProperties();
+ BasicContentHeaderProperties storedBodyProperties = (BasicContentHeaderProperties)storedBody.getProperties();
+
+ assertEquals(expectedBodyProperties.getPropertyFlags(), storedBodyProperties.getPropertyFlags());
+ assertEquals(expectedBodyProperties.getAppIdAsString(), storedBodyProperties.getAppIdAsString());
+ assertEquals(expectedBodyProperties.getClusterIdAsString(), storedBodyProperties.getClusterIdAsString());
+ assertEquals(expectedBodyProperties.getContentTypeAsString(), storedBodyProperties.getContentTypeAsString());
+ assertEquals(expectedBodyProperties.getCorrelationIdAsString(), storedBodyProperties.getCorrelationIdAsString());
+ assertEquals(expectedBodyProperties.getEncodingAsString(), storedBodyProperties.getEncodingAsString());
+ assertEquals(expectedBodyProperties.getDeliveryMode(), storedBodyProperties.getDeliveryMode());
+ assertEquals(expectedBodyProperties.getExpiration(), storedBodyProperties.getExpiration());
+ assertEquals(expectedBodyProperties.getMessageIdAsString(), storedBodyProperties.getMessageIdAsString());
+ assertEquals(expectedBodyProperties.getPriority(), storedBodyProperties.getPriority());
+ assertEquals(expectedBodyProperties.getReplyToAsString(), storedBodyProperties.getReplyToAsString());
+ assertEquals(expectedBodyProperties.getTypeAsString(), storedBodyProperties.getTypeAsString());
+ assertEquals(expectedBodyProperties.getUserIdAsString(), storedBodyProperties.getUserIdAsString());
+ assertEquals(expectedBodyProperties.getTimestamp(), storedBodyProperties.getTimestamp());
+ }
+
+}
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for AMQQueue tuple binding
+ */
+public class AMQQueueTupleBindingTest extends TestCase
+{
+ // test queue tuple binding factory
+ private QueueTupleBindingFactory _factory;
+
+ // test queue
+ private AMQQueue _testQueue;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ _factory = new QueueTupleBindingFactory(4, virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ _testQueue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), true,
+ new AMQShortString("tmp"), true, virtualHost, null);
+ }
+
+ /**
+ * Tests {@link Exchange} object serialization/de-serialization with {@link ExchangeTB}.
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ TupleBinding<AMQQueue> queueTupleBinding = _factory.getInstance();
+
+ queueTupleBinding.objectToEntry(_testQueue, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQQueue storedQueue = queueTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedQueue);
+
+ assertEquals(_testQueue.getName(), storedQueue.getName());
+ assertEquals(_testQueue.getOwner(), storedQueue.getOwner());
+ }
+}
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTB_4;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests encoding/decoding of AMQP short strings
+ */
+public class AMQShortStringTupleBindingTest extends TestCase
+{
+ /**
+ * Tests write and read operations for AMQP short string with length greater
+ * then 127.
+ */
+ public void testObjectToEntryForShortStringWithLengthGreater127()
+ {
+ AMQShortString testString = new AMQShortString(generateTestString(128));
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for null AMQP short string.
+ */
+ public void testObjectToEntryForNullShortString()
+ {
+ AMQShortString testString = null;
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for AMQP short string with length 255
+ * characters.
+ */
+ public void testObjectToEntryForShortStringWithLength255()
+ {
+ AMQShortString testString = new AMQShortString(generateTestString(255));
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for empty AMQP short string.
+ */
+ public void testObjectToEntryForEmptyShortString()
+ {
+ AMQShortString testString = new AMQShortString("");
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests
+ * {@link AMQShortStringEncoding#writeShortString(AMQShortString, TupleOutput)}
+ * and {@link AMQShortStringEncoding#readShortString(TupleInput)} to write
+ * and read AMQP short strings.
+ */
+ protected void assertObjectToEntryConversion(AMQShortString testString)
+ {
+ AMQShortStringTB_4 tupleBinding = new AMQShortStringTB_4();
+ // write string into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleBinding.objectToEntry(testString, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read string from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString restoredString = tupleBinding.entryToObject(tupleInput);
+
+ // assert read string against original string
+ assertEquals(testString, restoredString);
+ }
+
+ /**
+ * A helper method to generate a test string of given size
+ *
+ * @param length
+ * string length
+ * @return string
+ */
+ private String generateTestString(int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+}
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests BindingKey tuple serialization/de-serialization.
+ *
+ */
+public class BindingKeyTupleBindingTest extends TestCase
+{
+ // test queue tuple binding factory
+ private BindingTupleBindingFactory _factory;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ _factory = new BindingTupleBindingFactory(4, virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ }
+
+ /**
+ * Tests {@link BindingKey} object serialization/de-serialization.
+ */
+ public void testObjectToEntryConversion()
+ {
+ BindingKey key = new BindingKey(new AMQShortString("test1"), new AMQShortString("test2"),
+ new AMQShortString("test3"), null);
+
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ TupleBinding<BindingKey> keyTupleBinding = _factory.getInstance();
+
+ keyTupleBinding.objectToEntry(key, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ BindingKey storedKey = keyTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedKey);
+
+ assertEquals(key.getExchangeName(), storedKey.getExchangeName());
+ assertEquals(key.getQueueName(), storedKey.getQueueName());
+ assertEquals(key.getRoutingKey(), storedKey.getRoutingKey());
+ }
+}
Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link QueueEntryKeyTupleBinding}
+ */
+public class QueueEntryKeyTupleBindingTest extends TestCase
+{
+ /**
+ * Tests {@link QueueEntryKey} object serialization/de-serialization with
+ * {@link QueueEntryKeyTupleBinding}
+ */
+ public void testObjectToEntryConversion()
+ {
+ QueueEntryKey queueEntryKey = new QueueEntryKey(new AMQShortString("test queue"), 2);
+ QueueEntryKeyTupleBinding queueEntryKeyTupleBinding = new QueueEntryKeyTupleBinding(
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ queueEntryKeyTupleBinding.objectToEntry(queueEntryKey, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ QueueEntryKey storedQueueEntryKey = queueEntryKeyTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedQueueEntryKey);
+
+ assertEquals(queueEntryKey.messageId, storedQueueEntryKey.messageId);
+ assertEquals(queueEntryKey.queueName, storedQueueEntryKey.queueName);
+ }
+
+}
Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -37,7 +37,7 @@
public JNDIHelper(String broker)
{
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
+ CONNECTION_NAME = "amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker + "'";
setupJNDI();
}
Modified: store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-04 09:13:25 UTC (rev 4456)
@@ -22,11 +22,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
@@ -51,6 +55,9 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.ByteBinding;
@@ -95,7 +102,8 @@
private boolean _interactive;
private boolean _force;
- private static final String VERSION = "1.0";
+ private static final String VERSION = "2.0";
+ private static final String USER_ABORTED_PROCESS = "User aborted process";
private static final String OPTION_INPUT_SHORT = "i";
private static final String OPTION_INPUT = "input";
private static final String OPTION_OUTPUT_SHORT = "o";
@@ -244,7 +252,7 @@
{
if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
{
- throw new RuntimeException("User aborted process");
+ throw new RuntimeException(USER_ABORTED_PROCESS);
}
}
}
@@ -284,6 +292,11 @@
File backupDir, boolean force,
boolean inplace) throws Exception
{
+ if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
+ {
+ throw new IllegalArgumentException("Store on disk already upgraded to latest version "
+ + BDBMessageStore.DATABASE_FORMAT_VERSION);
+ }
_logger.info("Located store to upgrade at '" + fromDir + "'");
// Verify user has created a backup, giving option to perform backup
@@ -311,7 +324,7 @@
if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
"(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
{
- throw new IllegalArgumentException("Upgrade stopped as user request as no DB Backup performed.");
+ throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
}
}
}
@@ -350,17 +363,22 @@
try
{
- //Load the old MessageStore
+ int upgradeVersion = 2;
switch (version)
{
default:
+ case 2:
+ upgradeVersion = 2;
+ break;
case 1:
- _oldMessageStore = new BDBMessageStore(1);
- _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
- _oldMessageStore.start();
- upgradeFromVersion_1();
+ upgradeVersion = 1;
break;
}
+ //Load the old MessageStore
+ _oldMessageStore = new BDBMessageStore(upgradeVersion);
+ _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ _oldMessageStore.start();
+ upgrade(upgradeVersion);
}
finally
{
@@ -394,15 +412,34 @@
}
}
- private void upgradeFromVersion_1() throws AMQException, DatabaseException
+ private void upgrade(int version) throws AMQException, DatabaseException
{
- _logger.info("Starting store upgrade from version 1");
+ _logger.info("Starting store upgrade from version " + version);
_logger.info("Message Metadata");
//Migrate _messageMetaDataDb;
- moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(), "Message MetaData");
+ final TupleBinding<AMQShortString> newShortStringTupleBinding = _newMessageStore.getShortStringTupleBindingFactory().getInstance();
+ final MessageMetaDataTB newMessageMetaDataTupleBinding = new MessageMetaDataTB(newShortStringTupleBinding);
+ final TupleBinding<AMQShortString> oldShortStringTupleBinding = _oldMessageStore.getShortStringTupleBindingFactory().getInstance();
+ final MessageMetaDataTB oldMessageMetaDataTupleBinding = new MessageMetaDataTB(oldShortStringTupleBinding);
+ DatabaseVisitor messageMetaDataDBVisitor = new DatabaseVisitor()
+ {
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+ {
+ MessageMetaData metaData = oldMessageMetaDataTupleBinding.entryToObject(value);
+ DatabaseEntry data = new DatabaseEntry();
+ newMessageMetaDataTupleBinding.objectToEntry(metaData, data);
+ _newMessageStore.getMetaDataDb().put(null, entry, data);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitMetaDataDb(messageMetaDataDBVisitor);
+
+ logCount(messageMetaDataDBVisitor.getVisitedCount(), "Message MetaData");
+
_logger.info("Message Contents");
//Migrate _messageContentDb;
moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(), "Message Content");
@@ -410,6 +447,7 @@
_logger.info("Queues");
//Migrate _queueDb;
//Get the oldMessageStore Tuple Binding which does the parsing
+ @SuppressWarnings({ "rawtypes" })
final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
//Create a visitor that will take the queues in the oldMessageStore and add them to the newMessageStore
@@ -440,16 +478,54 @@
_logger.info("Delivery Records");
//Migrate _deliveryDb;
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
+ final EntryBinding<QueueEntryKey> oldKeyTupleBinding = new QueueEntryKeyTupleBinding(oldShortStringTupleBinding);
+ final EntryBinding<QueueEntryKey> newKeyTupleBinding = new QueueEntryKeyTupleBinding(newShortStringTupleBinding);
+ DatabaseVisitor deliveryDBVisitor = new DatabaseVisitor()
+ {
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+ {
+ QueueEntryKey keyObject = oldKeyTupleBinding.entryToObject(entry);
+ DatabaseEntry key = new DatabaseEntry();
+ newKeyTupleBinding.objectToEntry(keyObject, key);
+ _newMessageStore.getDeliveryDb().put(null, key, value);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitDelivery(deliveryDBVisitor);
+
+ logCount(queueVisitor.getVisitedCount(), "Delivery Record");
+
_logger.info("Exchanges");
//Migrate _exchangeDb;
- moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
+ // old exchange tuple binding
+ final ExchangeTB oldExchangeTupleBinding = new ExchangeTB(_oldVirtualHost, oldShortStringTupleBinding);
+ final ExchangeTB newExchangeTupleBinding = new ExchangeTB(_oldVirtualHost, newShortStringTupleBinding);
+ DatabaseVisitor echangeDBVisitor = new DatabaseVisitor()
+ {
+
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+ {
+ Exchange exchange = oldExchangeTupleBinding.entryToObject(value);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ newShortStringTupleBinding.objectToEntry(exchange.getName(), key);
+ newExchangeTupleBinding.objectToEntry(exchange, data);
+ _newMessageStore.getExchangesDb().put(null, key, data);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitExchanges(echangeDBVisitor);
+
+ logCount(queueVisitor.getVisitedCount(), "Exchange");
+
+
_logger.info("QueueBindings");
//Migrate _queueBindingsDb;
- final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
-
+ final TupleBinding<BindingKey> bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
//Create a visitor that to read the old format queue bindings
DatabaseVisitor queueBindings = new DatabaseVisitor()
{
@@ -458,7 +534,7 @@
BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
//Create a new Format TupleBinding
- TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
+ TupleBinding<BindingKey> newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
DatabaseEntry newKey = new DatabaseEntry();
newBindingTupleBinding.objectToEntry(queueBinding, newKey);
@@ -712,7 +788,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -725,6 +801,7 @@
}
+ @SuppressWarnings("static-access")
private static void setOptions(Options options)
{
Option input =
@@ -759,22 +836,22 @@
{
_logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
+ int version = getStoreVersion(fromDir);
+ if (version == 0)
+ {
+ _logger.info("Existing store version is undefined!");
+ return;
+ }
+ _logger.info("Existing store version is " + version);
try
{
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(1);
+ new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
_logger.info("Upgrade complete.");
}
catch (IllegalArgumentException iae)
{
- if (iae.getMessage().endsWith("Error: Unable to load BDBStore as version 1. Store on disk contains version 2 data."))
- {
- System.out.println("Store '" + fromDir + "' has already been upgraded to version 2.");
- }
- else
- {
- _logger.error("Upgrade not started due to: " + iae.getMessage());
- }
+ _logger.error("Upgrade not started due to: " + iae.getMessage());
}
catch (DatabaseException de)
{
@@ -783,7 +860,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -800,6 +877,64 @@
}
}
+ /**
+ * Detects existing store version by checking list of database in store
+ * environment
+ *
+ * @param fromDir
+ * store folder
+ * @return version
+ */
+ public static int getStoreVersion(File fromDir)
+ {
+ int version = 0;
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(false);
+ envConfig.setTransactional(false);
+ envConfig.setReadOnly(true);
+ Environment environment = null;
+ try
+ {
+
+ environment = new Environment(fromDir, envConfig);
+ List<String> databases = environment.getDatabaseNames();
+ for (String name : databases)
+ {
+ if (name.startsWith("exchangeDb"))
+ {
+ if (name.startsWith("exchangeDb_v"))
+ {
+ version = Integer.parseInt(name.substring(12));
+ }
+ else
+ {
+ version = 1;
+ }
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failure to open existing database: " + e.getMessage());
+ }
+ finally
+ {
+ if (environment != null)
+ {
+ try
+ {
+ environment.close();
+ }
+ catch (Exception e)
+ {
+ // ignoring. It should never happen.
+ }
+ }
+ }
+ return version;
+ }
+
private static void fatalError(String message)
{
System.out.println(message);
13 years, 7 months
rhmessaging commits: r4455 - in store/trunk/cpp: tools and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2011-05-03 14:49:45 -0400 (Tue, 03 May 2011)
New Revision: 4455
Added:
store/trunk/cpp/tools/__init__.py
Modified:
store/trunk/cpp/configure.ac
store/trunk/cpp/tools/Makefile.am
store/trunk/cpp/tools/resize
store/trunk/cpp/tools/store_chk
Log:
Fix for BZ-689907 - Fix rpmdiff issues
This change moves the installed python artifacts in an arch-specific directory to
prevent multi-lib issues.
Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac 2011-05-02 15:39:19 UTC (rev 4454)
+++ store/trunk/cpp/configure.ac 2011-05-03 18:49:45 UTC (rev 4455)
@@ -215,6 +215,9 @@
AM_CONDITIONAL([DOXYGEN], [test x$do_doxygen = xyes])
AM_CONDITIONAL([DO_CLUSTER_TESTS], [test $enable_CLUSTER_TESTS = yes])
+# Check for Python libraries for the Journal Tools
+AM_PATH_PYTHON()
+
AC_CONFIG_FILES([
Makefile
docs/Makefile
Modified: store/trunk/cpp/tools/Makefile.am
===================================================================
--- store/trunk/cpp/tools/Makefile.am 2011-05-02 15:39:19 UTC (rev 4454)
+++ store/trunk/cpp/tools/Makefile.am 2011-05-03 18:49:45 UTC (rev 4455)
@@ -20,12 +20,7 @@
# The GNU Lesser General Public License is available in the file COPYING.
qpidexecdir = $(libexecdir)/qpid
-qpidexec_SCRIPTS = jerr.py jrnl.py janal.py resize store_chk
+qpidexec_SCRIPTS = resize store_chk
-EXTRA_DIST = \
- jerr.py \
- jrnl.py \
- janal.py \
- resize \
- store_chk
-
\ No newline at end of file
+pkgpyexec_qpiddir = $(pyexecdir)/qpidstore
+pkgpyexec_qpid_PYTHON = __init__.py jerr.py jrnl.py janal.py
Added: store/trunk/cpp/tools/__init__.py
===================================================================
--- store/trunk/cpp/tools/__init__.py (rev 0)
+++ store/trunk/cpp/tools/__init__.py 2011-05-03 18:49:45 UTC (rev 4455)
@@ -0,0 +1,23 @@
+"""
+Copyright (c) 2007, 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2011-05-02 15:39:19 UTC (rev 4454)
+++ store/trunk/cpp/tools/resize 2011-05-03 18:49:45 UTC (rev 4455)
@@ -22,7 +22,7 @@
The GNU Lesser General Public License is available in the file COPYING.
"""
-import jerr, jrnl, janal
+from qpidstore import jerr, jrnl, janal
import glob, optparse, os, sys, time
Modified: store/trunk/cpp/tools/store_chk
===================================================================
--- store/trunk/cpp/tools/store_chk 2011-05-02 15:39:19 UTC (rev 4454)
+++ store/trunk/cpp/tools/store_chk 2011-05-03 18:49:45 UTC (rev 4455)
@@ -22,7 +22,7 @@
The GNU Lesser General Public License is available in the file COPYING.
"""
-import jerr, jrnl, janal
+from qpidstore import jerr, jrnl, janal
import optparse, os, sys
13 years, 7 months
rhmessaging commits: r4454 - store/tags.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2011-05-02 11:39:19 -0400 (Mon, 02 May 2011)
New Revision: 4454
Added:
store/tags/qpid-0.10-release/
Log:
Tagged Qpid 0.10 branch point
13 years, 7 months