[rhmessaging-commits] rhmessaging commits: r4438 - in store/trunk/cpp: lib/jrnl2 and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jan 14 14:52:46 EST 2011


Author: kpvdr
Date: 2011-01-14 14:52:45 -0500 (Fri, 14 Jan 2011)
New Revision: 4438

Added:
   store/trunk/cpp/lib/jrnl2/Configuration.hpp
   store/trunk/cpp/lib/jrnl2/RecordHeader.hpp
Modified:
   store/trunk/cpp/lib/BindingDbt.cpp
   store/trunk/cpp/lib/BufferValue.cpp
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/jrnl2/AioCallback.hpp
   store/trunk/cpp/lib/jrnl2/DataToken.hpp
   store/trunk/cpp/lib/jrnl2/Journal.cpp
   store/trunk/cpp/lib/jrnl2/Journal.hpp
   store/trunk/cpp/lib/jrnl2/JournalErrors.cpp
   store/trunk/cpp/lib/jrnl2/JournalErrors.hpp
   store/trunk/cpp/lib/jrnl2/JournalException.cpp
   store/trunk/cpp/lib/jrnl2/JournalException.hpp
   store/trunk/cpp/lib/jrnl2/JournalParameters.cpp
   store/trunk/cpp/lib/jrnl2/JournalParameters.hpp
   store/trunk/cpp/lib/jrnl2/Makefile.am
   store/trunk/cpp/lib/jrnl2/README
   store/trunk/cpp/perf/JournalParameters.cpp
   store/trunk/cpp/perf/JournalParameters.hpp
   store/trunk/cpp/perf/ScopedTimer.hpp
   store/trunk/cpp/perf/TestParameters.hpp
   store/trunk/cpp/perf/m
Log:
Further work in jrnl2 and perf.

Modified: store/trunk/cpp/lib/BindingDbt.cpp
===================================================================
--- store/trunk/cpp/lib/BindingDbt.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/BindingDbt.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -23,14 +23,10 @@
 
 #include "BindingDbt.h"
 
-using namespace mrg::msgstore;
-using qpid::broker::PersistableExchange;
-using qpid::broker::PersistableQueue;
-using qpid::framing::FieldTable;
-using std::string;
+namespace mrg {
+namespace msgstore {
 
-
-BindingDbt::BindingDbt(const PersistableExchange& e, const PersistableQueue& q, const string& k, const FieldTable& a)
+BindingDbt::BindingDbt(const qpid::broker::PersistableExchange& e, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
   : data(new char[encodedSize(e, q, k, a)]),
     buffer(data, encodedSize(e, q, k, a))
 {
@@ -48,7 +44,9 @@
   delete [] data;
 }
 
-uint32_t BindingDbt::encodedSize(const PersistableExchange& /*not used*/, const PersistableQueue& q, const string& k, const FieldTable& a)
+uint32_t BindingDbt::encodedSize(const qpid::broker::PersistableExchange& /*not used*/, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
 {
     return 8 /*queue id*/ + q.getName().size() + 1 + k.size() + 1 + a.encodedSize();
 }
+
+}}

Modified: store/trunk/cpp/lib/BufferValue.cpp
===================================================================
--- store/trunk/cpp/lib/BufferValue.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/BufferValue.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -23,9 +23,11 @@
 
 #include "BufferValue.h"
 
-using namespace mrg::msgstore;
-using qpid::broker::Persistable;
+namespace mrg {
+namespace msgstore {
 
+
+
 BufferValue::BufferValue(u_int32_t size, u_int64_t offset)
     : data(new char[size]),
       buffer(data, size)
@@ -38,7 +40,7 @@
     set_ulen(size);
 }
 
-BufferValue::BufferValue(const Persistable& p)
+BufferValue::BufferValue(const qpid::broker::Persistable& p)
   : data(new char[p.encodedSize()]),
     buffer(data, p.encodedSize())
 {
@@ -52,3 +54,5 @@
 {
     delete [] data;
 }
+
+}}

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -35,18 +35,12 @@
 #define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
 #define AIO_SLEEP_TIME_US  10 // 0.01 ms
 
-using namespace mrg::msgstore;
-using namespace qpid::broker;
-using boost::static_pointer_cast;
-using boost::intrusive_ptr;
-
-using std::auto_ptr;
-using std::max;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::management::ManagementAgent;
 namespace _qmf = qmf::com::redhat::rhm::store;
 
+namespace mrg {
+namespace msgstore {
+
+
 const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name
 // FIXME aconway 2010-03-09: was 10
 qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
@@ -224,7 +218,7 @@
     autoJrnlExpandMaxFiles = p;
 }
 
-void MessageStoreImpl::initManagement (Broker* broker)
+void MessageStoreImpl::initManagement (qpid::broker::Broker* broker)
 {
     if (broker != 0) {
         agent = broker->getManagementAgent();
@@ -484,15 +478,15 @@
     }
 }
 
-void MessageStoreImpl::create(PersistableQueue& queue,
-                             const FieldTable& args)
+void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
+                             const qpid::framing::FieldTable& args)
 {
     checkInit();
     if (queue.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
     JournalImpl* jQueue = 0;
-    FieldTable::ValuePtr value;
+    qpid::framing::FieldTable::ValuePtr value;
 
     u_int16_t localFileCount = numJrnlFiles;
     bool      localAutoExpandFlag = autoJrnlExpand;
@@ -529,7 +523,7 @@
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
         localAutoExpandMaxFileCount = (u_int16_t) value->get<int>();
 
-    queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+    queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
     try {
         // init will create the deque's for the init...
         jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
@@ -545,7 +539,7 @@
     }
 }
 
-void MessageStoreImpl::destroy(PersistableQueue& queue)
+void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue)
 {
     checkInit();
     destroy(queueDb, queue);
@@ -562,8 +556,8 @@
     }
 }
 
-void MessageStoreImpl::create(const PersistableExchange& exchange,
-                             const FieldTable& /*args*/)
+void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange,
+                             const qpid::framing::FieldTable& /*args*/)
 {
     checkInit();
     if (exchange.getPersistenceId()) {
@@ -578,7 +572,7 @@
     }
 }
 
-void MessageStoreImpl::destroy(const PersistableExchange& exchange)
+void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange)
 {
     checkInit();
     destroy(exchangeDb, exchange);
@@ -587,7 +581,7 @@
     bindingDb->del(0, &key, DB_AUTO_COMMIT);
 }
 
-void MessageStoreImpl::create(const PersistableConfig& general)
+void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general)
 {
     checkInit();
     if (general.getPersistenceId()) {
@@ -602,7 +596,7 @@
     }
 }
 
-void MessageStoreImpl::destroy(const PersistableConfig& general)
+void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general)
 {
     checkInit();
     destroy(generalDb, general);
@@ -610,7 +604,7 @@
 
 bool MessageStoreImpl::create(db_ptr db,
                              IdSequence& seq,
-                             const Persistable& p)
+                             const qpid::broker::Persistable& p)
 {
     u_int64_t id (seq.next());
     Dbt key(&id, sizeof(id));
@@ -634,17 +628,17 @@
     }
 }
 
-void MessageStoreImpl::destroy(db_ptr db, const Persistable& p)
+void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p)
 {
     IdDbt key(p.getPersistenceId());
     db->del(0, &key, DB_AUTO_COMMIT);
 }
 
 
-void MessageStoreImpl::bind(const PersistableExchange& e,
-                           const PersistableQueue& q,
+void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e,
+                           const qpid::broker::PersistableQueue& q,
                            const std::string& k,
-                           const FieldTable& a)
+                           const qpid::framing::FieldTable& a)
 {
     checkInit();
     IdDbt key(e.getPersistenceId());
@@ -660,16 +654,16 @@
     }
 }
 
-void MessageStoreImpl::unbind(const PersistableExchange& e,
-                             const PersistableQueue& q,
+void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e,
+                             const qpid::broker::PersistableQueue& q,
                              const std::string& k,
-                             const FieldTable&)
+                             const qpid::framing::FieldTable&)
 {
     checkInit();
     deleteBinding(e, q, k);
 }
 
-void MessageStoreImpl::recover(RecoveryManager& registry)
+void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
 {
     checkInit();
     txn_list prepared;
@@ -721,11 +715,11 @@
         if (citr->second.tpc_flag) {
             // Dtx (2PC) transaction
             TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
-            std::auto_ptr<TPCTransactionContext> txn(tpcc);
+            std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
             tpcc->recoverDtok(citr->second.rid, xid);
             tpcc->prepare(tplStorePtr.get());
 
-            RecoverableTransaction::shared_ptr dtx;
+            qpid::broker::RecoverableTransaction::shared_ptr dtx;
             if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
             if (i->enqueues.get()) {
                 for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
@@ -770,7 +764,7 @@
 }
 
 void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
-                                    RecoveryManager& registry,
+                                    qpid::broker::RecoveryManager& registry,
                                     queue_index& queue_index,
                                     txn_list& prepared,
                                     message_index& messages)
@@ -784,9 +778,9 @@
     Dbt value;
     //read all queues
     while (queues.next(key, value)) {
-        Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+        qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         //create a Queue instance
-        RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
+        qpid::broker::RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
         //set the persistenceId and update max as required
         queue->setPersistenceId(key.id);
 
@@ -804,7 +798,7 @@
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
         }
-        queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+        queue->setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
 
         try
         {
@@ -825,7 +819,7 @@
         //read all messages: done on a per queue basis if using Journal
 
         queue_index[key.id] = queue;
-        maxQueueId = max(key.id, maxQueueId);
+        maxQueueId = std::max(key.id, maxQueueId);
     }
 
     // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
@@ -838,7 +832,7 @@
 
 
 void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
-                                       RecoveryManager& registry,
+                                       qpid::broker::RecoveryManager& registry,
                                        exchange_index& index)
 {
     //TODO: this is a copy&paste from recoverQueues - refactor!
@@ -850,15 +844,15 @@
     Dbt value;
     //read all exchanges
     while (exchanges.next(key, value)) {
-        Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+        qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         //create a Exchange instance
-        RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
+        qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
         if (exchange) {
             //set the persistenceId and update max as required
             exchange->setPersistenceId(key.id);
             index[key.id] = exchange;
         }
-        maxExchangeId = max(key.id, maxExchangeId);
+        maxExchangeId = std::max(key.id, maxExchangeId);
     }
     exchangeIdSequence.reset(maxExchangeId + 1);
 }
@@ -873,7 +867,7 @@
     IdDbt key;
     Dbt value;
     while (bindings.next(key, value)) {
-        Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+        qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         if (buffer.available() < 8) {
             QPID_LOG(error, "Not enough data for binding: " << buffer.available());
             THROW_STORE_EXCEPTION("Not enough data for binding");
@@ -881,7 +875,7 @@
         uint64_t queueId = buffer.getLongLong();
         std::string queueName;
         std::string routingkey;
-        FieldTable args;
+        qpid::framing::FieldTable args;
         buffer.getShortString(queueName);
         buffer.getShortString(routingkey);
         buffer.get(args);
@@ -899,7 +893,7 @@
 }
 
 void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
-                                     RecoveryManager& registry)
+                                     qpid::broker::RecoveryManager& registry)
 {
     Cursor items;
     items.open(generalDb, txn.get());
@@ -909,12 +903,12 @@
     Dbt value;
     //read all items
     while (items.next(key, value)) {
-        Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+        qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         //create instance
-        RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
+        qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
         //set the persistenceId and update max as required
         config->setPersistenceId(key.id);
-        maxGeneralId = max(key.id, maxGeneralId);
+        maxGeneralId = std::max(key.id, maxGeneralId);
     }
     generalIdSequence.reset(maxGeneralId + 1);
 }
@@ -958,15 +952,15 @@
             {
               case mrg::journal::RHM_IORES_SUCCESS: {
                 msg_count++;
-                RecoverableMessage::shared_ptr msg;
+                qpid::broker::RecoverableMessage::shared_ptr msg;
                 char* data = (char*)dbuff;
 
                 unsigned headerSize;
                 if (externalFlag) {
                     msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
                 } else {
-                    headerSize = Buffer(data, preambleLength).getLong();
-                    Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+                    headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
+                    qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
                     msg = recovery.recoverMessage(headerBuff);
                 }
                 msg->setPersistenceId(dtok.rid());
@@ -978,7 +972,7 @@
                 u_int64_t contentSize = readSize - contentOffset;
                 if (msg->loadContent(contentSize) && !externalFlag) {
                     //now read the content
-                    Buffer contentBuff(data + contentOffset, contentSize);
+                    qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
                     msg->decodeContent(contentBuff);
                 }
 
@@ -1052,7 +1046,7 @@
     }
 }
 
-RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
+qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
                                                                  uint64_t /*messageId*/,
                                                                  unsigned& /*headerSize*/)
 {
@@ -1061,7 +1055,7 @@
 
 int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
                                     IdDbt& msgId,
-                                    RecoverableMessage::shared_ptr& msg,
+                                    qpid::broker::RecoverableMessage::shared_ptr& msg,
                                     queue_index& index,
                                     txn_list& prepared,
                                     message_index& messages)
@@ -1077,7 +1071,7 @@
             QPID_LOG(warning, "Recovered message for queue that no longer exists");
             mappings->del(0);
         } else {
-            RecoverableQueue::shared_ptr queue = index[value.id];
+            qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id];
             if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
                 messages[msgId.id] = msg;
             } else {
@@ -1209,24 +1203,24 @@
     }
 }
 
-void MessageStoreImpl::stage(const intrusive_ptr<PersistableMessage>& /*msg*/)
+void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
 {
     throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
 }
 
-void MessageStoreImpl::destroy(PersistableMessage& /*msg*/)
+void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
 {
     throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
 }
 
-void MessageStoreImpl::appendContent(const intrusive_ptr<const PersistableMessage>& /*msg*/,
+void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
                                     const std::string& /*data*/)
 {
     throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
 }
 
 void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
-                                  const intrusive_ptr<const PersistableMessage>& msg,
+                                  const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
                                   std::string& data,
                                   u_int64_t offset,
                                   u_int32_t length)
@@ -1272,9 +1266,9 @@
     }
 }
 
-void MessageStoreImpl::enqueue(TransactionContext* ctxt,
-                              const intrusive_ptr<PersistableMessage>& msg,
-                              const PersistableQueue& queue)
+void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
+                              const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                              const qpid::broker::PersistableQueue& queue)
 {
     checkInit();
     u_int64_t queueId (queue.getPersistenceId());
@@ -1303,7 +1297,7 @@
     if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 }
 
-u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const intrusive_ptr<PersistableMessage>& message)
+u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
 {
     u_int32_t headerSize = message->encodedHeaderSize();
     u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
@@ -1313,15 +1307,15 @@
         oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
         THROW_STORE_EXCEPTION(oss.str());
     }
-    Buffer buffer(&buff[0],size);
+    qpid::framing::Buffer buffer(&buff[0],size);
     buffer.putLong(headerSize);
     message->encode(buffer);
     return size;
 }
 
-void MessageStoreImpl::store(const PersistableQueue* queue,
+void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
                             TxnCtxt* txn,
-                            const intrusive_ptr<PersistableMessage>& message,
+                            const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
                             bool /*newId*/)
 {
     std::vector<char> buff;
@@ -1358,9 +1352,9 @@
     }
 }
 
-void MessageStoreImpl::dequeue(TransactionContext* ctxt,
-                              const intrusive_ptr<PersistableMessage>& msg,
-                              const PersistableQueue& queue)
+void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
+                              const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                              const qpid::broker::PersistableQueue& queue)
 {
     checkInit();
     u_int64_t queueId (queue.getPersistenceId());
@@ -1387,9 +1381,9 @@
     msg->dequeueComplete();
 }
 
-void MessageStoreImpl::async_dequeue(TransactionContext* ctxt,
-                                    const intrusive_ptr<PersistableMessage>& msg,
-                                    const PersistableQueue& queue)
+void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
+                                    const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                                    const qpid::broker::PersistableQueue& queue)
 {
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
     ddtokp->setSourceMessage(msg);
@@ -1451,11 +1445,11 @@
     }
 }
 
-auto_ptr<TransactionContext> MessageStoreImpl::begin()
+std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin()
 {
     checkInit();
     // pass sequence number for c/a
-    return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
+    return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence));
 }
 
 std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid)
@@ -1463,14 +1457,14 @@
     checkInit();
     IdSequence* jtx = &messageIdSequence;
     // pass sequence number for c/a
-    return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
+    return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
 }
 
 void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
-    if(!txn) throw InvalidTransactionContextException();
+    if(!txn) throw qpid::broker::InvalidTransactionContextException();
     localPrepare(txn);
 }
 
@@ -1502,7 +1496,7 @@
     }
 }
 
-void MessageStoreImpl::commit(TransactionContext& ctxt)
+void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
@@ -1513,7 +1507,7 @@
     completed(*dynamic_cast<TxnCtxt*>(txn), true);
 }
 
-void MessageStoreImpl::abort(TransactionContext& ctxt)
+void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
@@ -1524,10 +1518,10 @@
     completed(*dynamic_cast<TxnCtxt*>(txn), false);
 }
 
-TxnCtxt* MessageStoreImpl::check(TransactionContext* ctxt)
+TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt)
 {
     TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt);
-    if(!txn) throw InvalidTransactionContextException();
+    if(!txn) throw qpid::broker::InvalidTransactionContextException();
     return txn;
 }
 
@@ -1548,7 +1542,7 @@
     }
 }
 
-void MessageStoreImpl::deleteBindingsForQueue(const PersistableQueue& queue)
+void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue)
 {
     TxnCtxt txn;
     txn.begin(dbenv.get(), true);
@@ -1560,7 +1554,7 @@
             IdDbt key;
             Dbt value;
             while (bindings.next(key, value)) {
-                Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+                qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
                 if (buffer.available() < 8) {
                     THROW_STORE_EXCEPTION("Not enough data for binding");
                 }
@@ -1582,8 +1576,8 @@
     QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
 }
 
-void MessageStoreImpl::deleteBinding(const PersistableExchange& exchange,
-                                    const PersistableQueue& queue,
+void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange,
+                                    const qpid::broker::PersistableQueue& queue,
                                     const std::string& bkey)
 {
     TxnCtxt txn;
@@ -1597,7 +1591,7 @@
             Dbt value;
 
             for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
-                Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+                qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
                 if (buffer.available() < 8) {
                     THROW_STORE_EXCEPTION("Not enough data for binding");
                 }
@@ -1719,3 +1713,4 @@
         ;
 }
 
+}}

Modified: store/trunk/cpp/lib/jrnl2/AioCallback.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/AioCallback.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/AioCallback.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
 #define mrg_journal2_AioCallback_hpp
 
 #include "DataToken.hpp"
-#include <sys/types.h> // u_int16_t, u_int32_t
+#include <stdint.h> // uint16_t
 #include <vector>
 
 namespace mrg
@@ -46,7 +46,7 @@
     public:
         virtual ~AioCallback() {}
         virtual void writeAioCompleteCallback(std::vector<DataToken*>& dataTokenList) = 0;
-        virtual void readAioCompleteCallback(std::vector<u_int16_t>& buffPageCtrlBlkIndexList) = 0;
+        virtual void readAioCompleteCallback(std::vector<uint16_t>& buffPageCtrlBlkIndexList) = 0;
     };
 
 } // namespace journal2

Added: store/trunk/cpp/lib/jrnl2/Configuration.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Configuration.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl2/Configuration.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -0,0 +1,112 @@
+/**
+ * \file Configuration.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains async journal code (v.2).
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+#ifndef mrg_journal2_Configuration_hpp
+#define mrg_journal2_Configuration_hpp
+
+#include <stdint.h> // uint8_t
+
+#if defined(__i386__) /* little endian, 32 bits */
+  #define JRNL_LITTLE_ENDIAN
+//  #define JRNL_32_BIT
+#elif defined(__PPC__) || defined(__s390__)  /* big endian, 32 bits */
+  #define JRNL_BIG_ENDIAN
+//  #define JRNL_32_BIT
+#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */
+  #define JRNL_LITTLE_ENDIAN
+//  #define JRNL_64_BIT
+#elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */
+  #define JRNL_BIG_ENDIAN
+//  #define JRNL_64_BIT
+#else
+  #error Unable to determine endianness
+#endif
+
+
+/**
+* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that
+* <pre>
+* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...)
+* </pre>
+* (The disk softblock size is 512 for Linux kernels >= 2.6)
+*/
+//#define JRNL_DBLK_SIZE          128         ///< Data block size in bytes (CANNOT BE LESS THAN 32!)
+//#define JRNL_SBLK_SIZE          4           ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
+//#define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MAX_FILE_SIZE      4194304     ///< Max. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
+//#define JRNL_MAX_NUM_FILES      64          ///< Max. number of journal files
+//#define JRNL_ENQ_THRESHOLD      80          ///< Percent full when enqueue connection will be closed
+//
+//#define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks
+//#define JRNL_RMGR_PAGES         16          ///< Number of pages to use in wmgr
+//
+//#define JRNL_WMGR_DEF_PAGE_SIZE 64          ///< Journal write page size in softblocks (default)
+//#define JRNL_WMGR_DEF_PAGES     32          ///< Number of pages to use in wmgr (default)
+//
+//#define JRNL_WMGR_MAXDTOKPP     1024        ///< Max. dtoks (data blocks) per page in wmgr
+//#define JRNL_WMGR_MAXWAITUS     100         ///< Max. wait time (us) before submitting AIO
+//
+//#define JRNL_INFO_EXTENSION     "jinf"      ///< Extension for journal info files
+//#define JRNL_DATA_EXTENSION     "jdat"      ///< Extension for journal data files
+//#define RHM_JDAT_TXA_MAGIC      0x614d4852  ///< ("RHMa" in little endian) Magic for dtx abort hdrs
+//#define RHM_JDAT_TXC_MAGIC      0x634d4852  ///< ("RHMc" in little endian) Magic for dtx commit hdrs
+//#define RHM_JDAT_DEQ_MAGIC      0x644d4852  ///< ("RHMd" in little endian) Magic for deq rec hdrs
+//#define RHM_JDAT_ENQ_MAGIC      0x654d4852  ///< ("RHMe" in little endian) Magic for enq rec hdrs
+//#define RHM_JDAT_FILE_MAGIC     0x664d4852  ///< ("RHMf" in little endian) Magic for file hdrs
+//#define RHM_JDAT_EMPTY_MAGIC    0x784d4852  ///< ("RHMx" in little endian) Magic for empty dblk
+//#define RHM_JDAT_VERSION        0x01        ///< Version (of file layout)
+//#define RHM_CLEAN_CHAR          0xff        ///< Char used to clear empty space on disk
+
+#define RHM_LENDIAN_FLAG 0x0                  ///< Value of little endian flag on disk
+#define RHM_BENDIAN_FLAG 0x1                  ///< Value of big endian flag on disk
+
+namespace mrg
+{
+namespace journal2
+{
+
+    struct Configuration
+    {
+        static const uint8_t _s_endianValue =
+#if defined(JRNL_LITTLE_ENDIAN)
+            RHM_LENDIAN_FLAG;
+#elif defined(JRNL_BIG_ENDIAN)
+            RHM_BENDIAN_FLAG;
+#else
+#error Unknown endianness
+#endif
+    };
+
+} // namespace journal2
+} // namespace mrg
+
+#endif // ifndef mrg_journal2_Configuration_hpp

Modified: store/trunk/cpp/lib/jrnl2/DataToken.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/DataToken.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/DataToken.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
 #define mrg_journal2_DataToken_hpp
 
 #include <string>
-#include <sys/types.h> // u_int64_t
+#include <stdint.h> // uint64_t
 
 #include "DataTokenState.hpp"
 #include "ScopedLock.hpp"
@@ -43,7 +43,7 @@
 namespace journal2
 {
 
-    typedef u_int64_t recordId_t;
+    typedef uint64_t recordId_t;
 
     class AtomicRecordIdCounter
     {

Modified: store/trunk/cpp/lib/jrnl2/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Journal.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Journal.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -48,7 +48,7 @@
     }
 
     // static
-    u_int32_t Journal::_s_listSizeThreshold = 50;
+    uint32_t Journal::_s_listSizeThreshold = 50;
 
     Journal::Journal(const std::string& jrnlId,
                const std::string& jrnlDir,
@@ -119,7 +119,7 @@
         return 0;
     }
 
-    u_int32_t
+    uint32_t
     Journal::getWriteAioEventsRemaining() const
     {
         while (true) { // --- START OF CRITICAL SECTION ---

Modified: store/trunk/cpp/lib/jrnl2/Journal.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Journal.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Journal.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
 #define mrg_journal2_Journal_hpp
 
 #include <string>
-#include <sys/types.h> // u_int64_t, u_int32_t, etc.
+#include <stdint.h> // uint64_t, uint32_t, etc.
 #include <time.h> // timespec
 
 #include "AioCallback.hpp"
@@ -44,6 +44,7 @@
 // --- temp code ---
 #include "ScopedLock.hpp" // ScopedMutex
 #include <vector>
+#include "RecordHeader.hpp"
 // --- end temp code ---
 
 namespace mrg
@@ -52,7 +53,7 @@
 {
 
     // TODO - decide if this is the right place to expose these codes and flags
-    typedef u_int64_t ioRes; // TODO - this needs to be expressed as flags
+    typedef uint64_t ioRes; // TODO - this needs to be expressed as flags
     const ioRes RHM_IORES_ENQCAPTHRESH = 0x1;
     const ioRes RHM_IORES_BUSY = 0x2;
     std::string g_ioResAsString(const ioRes /*res*/);
@@ -68,7 +69,7 @@
         AioCallback* _aioCallbackPtr;
 
         // --- temp code ---
-        static u_int32_t _s_listSizeThreshold;
+        static uint32_t _s_listSizeThreshold;
         std::vector<DataToken*> _writeDataTokenList;
         std::vector<DataToken*> _callBackDataTokenList[2];
         bool _callBackDataTokenListSwitch;
@@ -97,7 +98,7 @@
 
         // aio ops and status
         // --- temp code ---
-        u_int32_t getWriteAioEventsRemaining() const;
+        uint32_t getWriteAioEventsRemaining() const;
         // --- end of temp code ---
         void flush(const bool blockTillAioCompleteFlag);
         void processCompletedAioWriteEvents(timespec* const timeout);

Modified: store/trunk/cpp/lib/jrnl2/JournalErrors.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalErrors.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalErrors.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -36,23 +36,15 @@
 namespace journal2
 {
 
-    std::map<u_int32_t, const char*> JournalErrors::_s_errorMap;
-    std::map<u_int32_t, const char*>::iterator JournalErrors::_s_errorMapIterator;
+    std::map<uint32_t, const char*> JournalErrors::_s_errorMap;
+    std::map<uint32_t, const char*>::iterator JournalErrors::_s_errorMapIterator;
     bool JournalErrors::_s_initializedFlag = JournalErrors::_s_initialize();
 
-    // generic errors
-    const u_int32_t JournalErrors::JERR_PTHREAD            = 0x0001;
-
-    // illegal states
-    const u_int32_t JournalErrors::JERR_BADJRNLSTATE       = 0x0101;
-    const u_int32_t JournalErrors::JERR_BADDTOKOPSTATE     = 0x0102;
-    const u_int32_t JournalErrors::JERR_BADDTOKTXNSTATE    = 0x0103;
-    const u_int32_t JournalErrors::JERR_BADDTOKIOSTATE     = 0x0104;
-
     bool
     JournalErrors::_s_initialize()
     {
         _s_errorMap[JERR_PTHREAD] = "JERR_PTHREAD: pthread operation failure";
+        _s_errorMap[JERR_RTCLOCK] = "JERR_RTCLOCK: realtime clock operation failure";
         _s_errorMap[JERR_BADJRNLSTATE] = "JERR_BADJRNLSTATE: Illegal journal state";
         _s_errorMap[JERR_BADDTOKOPSTATE] = "JERR_BADDTOKOPSTATE: Illegal data token op state";
         _s_errorMap[JERR_BADDTOKTXNSTATE] = "JERR_BADDTOKTXNSTATE: Illegal data token txn state";
@@ -61,7 +53,7 @@
     }
 
     const char*
-    JournalErrors::s_errorMessage(const u_int32_t err_no) throw ()
+    JournalErrors::s_errorMessage(const uint32_t err_no) throw ()
     {
         _s_errorMapIterator = _s_errorMap.find(err_no);
         if (_s_errorMapIterator == _s_errorMap.end())

Modified: store/trunk/cpp/lib/jrnl2/JournalErrors.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalErrors.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalErrors.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
 #define mrg_journal2_JournalErrors_hpp
 
 #include <map>
-#include <sys/types.h> // u_int32_t
+#include <stdint.h> // uint32_t
 
 namespace mrg
 {
@@ -42,22 +42,23 @@
 
     class JournalErrors
     {
-        static std::map<u_int32_t, const char*> _s_errorMap;            ///< Map of error messages
-        static std::map<u_int32_t, const char*>::iterator _s_errorMapIterator; ///< Iterator
-        static bool _s_initializedFlag;                                 ///< Dummy flag, used to initialize map.
-        static bool _s_initialize();                                    ///< Static fn for initializing static data
+        static std::map<uint32_t, const char*> _s_errorMap; ///< Map of error messages
+        static std::map<uint32_t, const char*>::iterator _s_errorMapIterator; ///< Iterator
+        static bool _s_initializedFlag;                         ///< Dummy flag, used to initialize map.
+        static bool _s_initialize();                            ///< Static fn for initializing static data
     public:
         // generic errors
-        static const u_int32_t JERR_PTHREAD;                            ///< pthread operation failure
+        static const uint32_t JERR_PTHREAD = 0x0001;            ///< pthread operation failure
+        static const uint32_t JERR_RTCLOCK = 0x0002;            ///< realtime clock operation failure
 
         // illegal states
-        static const u_int32_t JERR_BADJRNLSTATE;                       ///< Illegal journal state
-        static const u_int32_t JERR_BADDTOKOPSTATE;                     ///< Illegal data token op state
-        static const u_int32_t JERR_BADDTOKTXNSTATE;                    ///< Illegal data token txn state
-        static const u_int32_t JERR_BADDTOKIOSTATE;                     ///< Illegal data token io state
+        static const uint32_t JERR_BADJRNLSTATE = 0x0101;       ///< Illegal journal state
+        static const uint32_t JERR_BADDTOKOPSTATE = 0x0102;     ///< Illegal data token op state
+        static const uint32_t JERR_BADDTOKTXNSTATE = 0x0103;    ///< Illegal data token txn state
+        static const uint32_t JERR_BADDTOKIOSTATE = 0x0104;     ///< Illegal data token io state
 
 
-        static const char* s_errorMessage(const u_int32_t err_no) throw ();
+        static const char* s_errorMessage(const uint32_t err_no) throw ();
     };
 
 } // namespace journal2

Modified: store/trunk/cpp/lib/jrnl2/JournalException.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalException.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalException.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -45,7 +45,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode) throw ():
+    JournalException::JournalException(const uint32_t errorCode) throw ():
         std::exception(),
         _errorCode(errorCode)
     {
@@ -68,7 +68,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const char* additionalInfo) throw ():
         std::exception(),
         _errorCode(errorCode),
@@ -77,7 +77,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const std::string& additionalInfo) throw ():
         std::exception(),
         _errorCode(errorCode),
@@ -86,7 +86,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const char* throwingClass,
                                        const char* throwingFunction) throw ():
         std::exception(),
@@ -97,7 +97,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const std::string& throwingClass,
                                        const std::string& throwingFunction) throw ():
         std::exception(),
@@ -108,7 +108,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const char* additionalInfo,
                                        const char* throwingClass,
                                        const char* throwingFunction) throw ():
@@ -121,7 +121,7 @@
         _formatWhatStr();
     }
 
-    JournalException::JournalException(const u_int32_t errorCode,
+    JournalException::JournalException(const uint32_t errorCode,
                                        const std::string& additionalInfo,
                                        const std::string& throwingClass,
                                        const std::string& throwingFunction) throw ():

Modified: store/trunk/cpp/lib/jrnl2/JournalException.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalException.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalException.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -35,7 +35,7 @@
 #include <cstring> // std::strerror
 #include <sstream>
 #include <string>
-#include <sys/types.h> // u_int32_t
+#include <stdint.h> // uint32_t
 
 #include "JournalErrors.hpp"
 
@@ -63,7 +63,7 @@
     class JournalException : public std::exception
     {
     protected:
-        u_int32_t _errorCode;
+        uint32_t _errorCode;
         std::string _additionalInfo;
         std::string _throwingClass;
         std::string _throwingFunction;
@@ -74,28 +74,28 @@
     public:
         JournalException() throw ();
 
-        JournalException(const u_int32_t errorCode) throw ();
+        JournalException(const uint32_t errorCode) throw ();
 
         JournalException(const char* additionalInfo) throw ();
         JournalException(const std::string& additionalInfo) throw ();
 
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const char* additionalInfo) throw ();
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const std::string& additionalInfo) throw ();
 
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const char* throwingClass,
                          const char* throwingFunction) throw ();
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const std::string& throwingClass,
                          const std::string& throwingFunction) throw ();
 
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const char* additionalInfo,
                          const char* throwingClass,
                          const char* throwingFunction) throw ();
-        JournalException(const u_int32_t errorCode,
+        JournalException(const uint32_t errorCode,
                          const std::string& additionalInfo,
                          const std::string& throwingClass,
                          const std::string& throwingFunction) throw ();
@@ -103,7 +103,7 @@
         virtual ~JournalException() throw () {}
         const char* what() const throw (); // override std::exception::what()
 
-        inline u_int32_t getErrorCode() const throw () { return _errorCode; }
+        inline uint32_t getErrorCode() const throw () { return _errorCode; }
         inline const std::string getAdditionalInfo() const throw () { return _additionalInfo; }
         inline const std::string getThrowingClass() const throw () { return _throwingClass; }
         inline const std::string getThrowingFunction() const throw () { return _throwingFunction; }

Modified: store/trunk/cpp/lib/jrnl2/JournalParameters.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalParameters.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalParameters.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,9 +1,9 @@
 /**
- * \file JournalParameters.cpp
+ * \file jrnl2/JournalParameters.cpp
  *
  * Qpid asynchronous store plugin library
  *
- * This file contains performance test code for the journal.
+ * This file contains async journal code (v.2).
  *
  * \author Kim van der Riet
  *
@@ -41,12 +41,12 @@
     // static declarations
     std::string JournalParameters::_s_defaultJrnlDir = "/tmp/store";
     std::string JournalParameters::_s_defaultJrnlBaseFileName = "JournalData";
-    u_int16_t JournalParameters::_s_defaultNumJrnlFiles = 8;
-    u_int32_t JournalParameters::_s_defaultJrnlFileSize_sblks = 3072;
+    uint16_t JournalParameters::_s_defaultNumJrnlFiles = 8;
+    uint32_t JournalParameters::_s_defaultJrnlFileSize_sblks = 3072;
     bool JournalParameters::_s_defaultAutoExpand = false;
-    u_int16_t JournalParameters::_s_defaultAutoExpandMaxJrnlFiles = 0;
-    u_int16_t JournalParameters::_s_defaultWriteBuffNumPgs = 32;
-    u_int32_t JournalParameters::_s_defaultWriteBuffPgSize_sblks = 128;
+    uint16_t JournalParameters::_s_defaultAutoExpandMaxJrnlFiles = 0;
+    uint16_t JournalParameters::_s_defaultWriteBuffNumPgs = 32;
+    uint32_t JournalParameters::_s_defaultWriteBuffPgSize_sblks = 128;
 
     JournalParameters::JournalParameters() :
         _jrnlDir(_s_defaultJrnlDir),
@@ -61,12 +61,12 @@
 
     JournalParameters::JournalParameters(const std::string& jrnlDir,
                                          const std::string& jrnlBaseFileName,
-                                         const u_int16_t numJrnlFiles,
+                                         const uint16_t numJrnlFiles,
                                          const bool autoExpand,
-                                         const u_int16_t autoExpandMaxJrnlFiles,
-                                         const u_int32_t jrnlFileSize_sblks,
-                                         const u_int16_t writeBuffNumPgs,
-                                         const u_int32_t writeBuffPgSize_sblks) :
+                                         const uint16_t autoExpandMaxJrnlFiles,
+                                         const uint32_t jrnlFileSize_sblks,
+                                         const uint16_t writeBuffNumPgs,
+                                         const uint32_t writeBuffPgSize_sblks) :
         _jrnlDir(jrnlDir),
         _jrnlBaseFileName(jrnlBaseFileName),
         _numJrnlFiles(numJrnlFiles),

Modified: store/trunk/cpp/lib/jrnl2/JournalParameters.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalParameters.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalParameters.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,9 +1,9 @@
 /**
- * \file  JournalParameters.hpp
+ * \file jrnl2/JournalParameters.hpp
  *
  * Qpid asynchronous store plugin library
  *
- * This file contains performance test code for the journal.
+ * This file contains async journal code (v.2).
  *
  * \author Kim van der Riet
  *
@@ -34,7 +34,7 @@
 
 #include <iostream>
 #include <string>
-#include <sys/types.h> // u_int16_t, u_int32_t
+#include <stdint.h> // uint16_t, uint32_t
 
 namespace mrg
 {
@@ -46,31 +46,31 @@
         // static default store params
         static std::string _s_defaultJrnlDir;
         static std::string _s_defaultJrnlBaseFileName;
-        static u_int16_t _s_defaultNumJrnlFiles;
-        static u_int32_t _s_defaultJrnlFileSize_sblks;
+        static uint16_t _s_defaultNumJrnlFiles;
+        static uint32_t _s_defaultJrnlFileSize_sblks;
         static bool _s_defaultAutoExpand;
-        static u_int16_t _s_defaultAutoExpandMaxJrnlFiles;
-        static u_int16_t _s_defaultWriteBuffNumPgs;
-        static u_int32_t _s_defaultWriteBuffPgSize_sblks;
+        static uint16_t _s_defaultAutoExpandMaxJrnlFiles;
+        static uint16_t _s_defaultWriteBuffNumPgs;
+        static uint32_t _s_defaultWriteBuffPgSize_sblks;
 
         std::string _jrnlDir;
         std::string _jrnlBaseFileName;
-        u_int16_t _numJrnlFiles;
-        u_int32_t _jrnlFileSize_sblks;
+        uint16_t _numJrnlFiles;
+        uint32_t _jrnlFileSize_sblks;
         bool _autoExpand;
-        u_int16_t _autoExpandMaxJrnlFiles;
-        u_int16_t _writeBuffNumPgs;
-        u_int32_t _writeBuffPgSize_sblks;
+        uint16_t _autoExpandMaxJrnlFiles;
+        uint16_t _writeBuffNumPgs;
+        uint32_t _writeBuffPgSize_sblks;
 
         JournalParameters();
         JournalParameters(const std::string& jrnlDir,
                           const std::string& jrnlBaseFileName,
-                          const u_int16_t numJrnlFiles,
+                          const uint16_t numJrnlFiles,
                           const bool autoExpand,
-                          const u_int16_t autoExpandMaxJrnlFiles,
-                          const u_int32_t jrnlFileSize_sblks,
-                          const u_int16_t writeBuffNumPgs,
-                          const u_int32_t writeBuffPgSize_sblks);
+                          const uint16_t autoExpandMaxJrnlFiles,
+                          const uint32_t jrnlFileSize_sblks,
+                          const uint16_t writeBuffNumPgs,
+                          const uint32_t writeBuffPgSize_sblks);
         JournalParameters(const JournalParameters& sp);
         void toStream(std::ostream& os = std::cout) const;
         std::string toString() const;

Modified: store/trunk/cpp/lib/jrnl2/Makefile.am
===================================================================
--- store/trunk/cpp/lib/jrnl2/Makefile.am	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Makefile.am	2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,6 +33,7 @@
 	JournalParameters.cpp \
 	JournalState.cpp \
 	AioCallback.hpp \
+	Configuration.hpp \
 	DataToken.hpp \
 	DataTokenState.hpp \
 	Journal.hpp \
@@ -41,5 +42,6 @@
 	JournalException.hpp \
 	JournalParameters.hpp \
 	JournalState.hpp \
+	RecordHeader.hpp \
 	ScopedLock.hpp
 

Modified: store/trunk/cpp/lib/jrnl2/README
===================================================================
--- store/trunk/cpp/lib/jrnl2/README	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/README	2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,3 +1,21 @@
 This directory contains experimental code. It is not (currently)
 a part of the main store, and can be safely ignored for all
 normal builds.
+
+This is a refactorization of the lib/jrnl directory, with completely
+renamed classes according to normal C++ naming standards used elsewhere
+in this project.
+
+Also included are the following enhancements:
+* A new record type for journal events. This will allow the redelivered
+  flag to be set correctly (amongst other things)
+* A new record tail layout which contains a checksum. This will solve the
+  issue of losing data unknowingly if the first and last pages of a
+  multi-page write occur, but some in-between pages are still pending at
+  time of failure.
+* Some minor refactoring of the FileHeader type
+* A clean-up with a proper heirarchy of these classes
+
+NOTE: these will break binary compatibility with earlier journals - an
+upgrade issue. This journal will need to have its revision number incremented
+from 1 to 2.

Added: store/trunk/cpp/lib/jrnl2/RecordHeader.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/RecordHeader.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl2/RecordHeader.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -0,0 +1,1333 @@
+/**
+ * \file RecordHeader.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains async journal code (v.2).
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010, 2011 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ *
+ * List of structs in this file:
+ *   struct DequeueHeader
+ *   struct EnqueueHeader
+ *   struct EventHeader
+ *   struct FileHeader
+ *   struct RecordHeader
+ *   struct RecordTail
+ *   struct TransactionHeader
+ *
+ * Overview of structs in this file:
+ *
+ * <pre>
+ *  +------------+                   +--------------+
+ *  | RecordTail |                   | RecordHeader |
+ *  +------------+                   |  (abstract)  |
+ *                                   +--------------+
+ *                                           ^
+ *                                           |
+ *        +----------------+-----------------+-------------------+
+ *        |                |                 |                   |
+ *  +------------+  +-------------+  +---------------+  +-------------------+
+ *  | FileHeader |  | EventHeader |  | DequeueHeader |  | TransactionHeader |
+ *  +------------+  +-------------+  +---------------+  +-------------------+
+ *                         ^
+ *                         |
+ *                 +---------------+
+ *                 | EnqueueHeader |
+ *                 +---------------+
+ *  </pre>
+ */
+
+#ifndef mrg_journal2_RecordHeader_hpp
+#define mrg_journal2_RecordHeader_hpp
+
+#include <cstddef> // std::size_t
+#include <ctime> // std::time_t
+#include "Configuration.hpp"
+#include "JournalException.hpp"
+#include <stdint.h> // uint8_t, uint16_t, uint32_t, uint64_t
+
+namespace mrg
+{
+namespace journal2
+{
+
+#pragma pack(1)
+
+    /**
+     * \brief Struct for data common to the head of all journal files and records.
+     * This includes identification for the file type, the encoding version, endian
+     * indicator and a record ID.
+     *
+     * File layout in binary format (16 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x08 |                  _recordId                    |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct RecordHeader
+    {
+        uint32_t _magic;                ///< File type identifier (magic number)
+        uint8_t _version;               ///< File encoding version
+        uint8_t _bigEndianFlag;         ///< Flag for determining endianness
+        uint16_t _flags;                ///< User and system flags
+        uint64_t _recordId;             ///< Record ID (rotating 64-bit counter)
+
+        static const uint16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        inline RecordHeader() :
+            _magic(0),
+            _version(0),
+            _bigEndianFlag(0),
+            _flags(0),
+            _recordId(0)
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic Magic for this record
+         * \param version Version of this record
+         * \param recordId Record identifier for this record
+         * \param overwriteIndicator Overwrite indicator for this record
+         */
+        inline RecordHeader(const uint32_t magic,
+                            const uint8_t version,
+                            const uint64_t recordId,
+                            const bool overwriteIndicator) :
+            _magic(magic),
+            _version(version),
+            _bigEndianFlag(Configuration::_s_endianValue),
+            _flags(overwriteIndicator ? HDR_OVERWRITE_INDICATOR_MASK : 0),
+            _recordId(recordId)
+        {}
+
+        /**
+         * \brief Copy constructor.
+         */
+        inline RecordHeader(const RecordHeader& rh) :
+            _magic(rh._magic),
+            _version(rh._version),
+            _bigEndianFlag(rh._bigEndianFlag),
+            _flags(rh._flags),
+            _recordId(rh._recordId)
+        {}
+
+        /**
+         * \brief Destructor.
+         */
+        virtual inline ~RecordHeader()
+        {}
+
+        /**
+         * \brief Convenience copy method.
+         */
+        virtual inline void copy(const RecordHeader& rh)
+        {
+            _magic = rh._magic;
+            _version = rh._version;
+            _bigEndianFlag = rh._bigEndianFlag;
+            _flags = rh._flags;
+            _recordId = rh._recordId;
+        }
+
+        /**
+         * \brief Resets all fields to default values (mostly 0).
+         */
+        virtual inline void reset()
+        {
+            _magic = 0;
+            _version = 0;
+            _bigEndianFlag = 0;
+            _flags = 0;
+            _recordId = 0;
+        }
+
+        /**
+         * \brief Return the value of the Overwrite Indicator for this record.
+         *
+         * \return true if the Overwrite Indicator flag is set, false otherwise.
+         */
+        inline bool getOverwriteIndicator() const {
+            return _flags & HDR_OVERWRITE_INDICATOR_MASK;
+        }
+
+        /**
+         * \brief Set the value of the Overwrite Indicator for this record
+         */
+        inline void setOverwriteIndicator(const bool owi) {
+            _flags = owi ?
+                     _flags | HDR_OVERWRITE_INDICATOR_MASK :
+                     _flags & (~HDR_OVERWRITE_INDICATOR_MASK);
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes. Must be implemented by
+         * subclasses.
+         *
+         * \return Size of record header in bytes.
+         */
+        static inline std::size_t getHeaderSize() {
+            return sizeof(RecordHeader);
+        }
+
+        /**
+         * \brief Return the body (data) size of this record in bytes. Must be implemented
+         * by subclasses.
+         *
+         * \return Size of record body in bytes.
+         */
+        virtual std::size_t getBodySize() const = 0;
+
+        /**
+         * \brief Return total size of this record in bytes, being the sum of the header,
+         * xid (if present), data (if present) and tail (if present). Must be implemented
+         * by subclasses.
+         *
+         * \returns The size of the entire record, including header, body (xid and data,
+         * if present) and record tail (if persent) in bytes.
+         */
+        virtual std::size_t getRecordSize() const = 0;
+
+        // TODO - Is this the right place for encode/decode fns?
+        ///**
+        // * \brief Encode (write) this record instance into the buffer pointed to by the buffer
+        // * pointer. Must be implemented by subclasses.
+        // */
+        //virtual std::size_t encode(char* bufferPtr,
+        //                           const std::size_t bufferSize,
+        //                           const std::size_t encodeOffset = 0) = 0;
+
+        /**
+         * \brief Return a uint32_t checksum for the header and body content of this record.
+         *
+         * \param initialValue The initial (or seed) value of the checksum.
+         *
+         * \return Checksum for header and body of record. Tail (if any) is excluded.
+         */
+        inline uint32_t getCheckSum(uint32_t initialValue = 0) const {
+            uint32_t cs = initialValue;
+            for (unsigned char* p = (unsigned char*)this;
+                                p < (unsigned char*)this + getHeaderSize() + getBodySize();
+                                p++) {
+                cs ^= (uint32_t)(*p);
+                bool carry = cs & uint32_t(0x80000000);
+                cs <<= 1;
+                if (carry) cs++;
+            }
+            return cs;
+        }
+    }; // struct RecordHeader
+
+
+
+
+    /**
+     * \brief Struct for data common to the tail of all records. The magic number
+     * used here is the binary inverse (1's complement) of the magic used in the
+     * record header; this minimizes possible confusion with other headers that may
+     * be present during recovery. The tail is used with all records that have either
+     * XIDs or data - ie any size-variable content. Currently the only records that
+     * do NOT use the tail are non-transactional dequeues and filler records.
+     *
+     * Record layout in binary format (16 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x00 |       _xMagic         |       _checkSum       |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x08 |                   _recordId                   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     */
+    struct RecordTail
+    {
+        uint32_t _xMagic;               ///< Binary inverse (1's complement) of hdr magic number
+        uint32_t _checkSum;             ///< Checksum for header and body of record
+        uint64_t _recordId;             ///< Record identifier matching that of the header for this record
+
+
+        /**
+         * \brief Default constructor, which sets most values to 0.
+         */
+        inline RecordTail() :
+            _xMagic(0xffffffff),
+            _checkSum(0),
+            _recordId(0)
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param xMagic The inverse of the record header magic (ie ~rh._magic).
+         * \param checkSum The checksum for this record header and body.
+         * \param recordId The record identifier matching the record header.
+         */
+        inline RecordTail(const uint32_t xMagic,
+                          const uint32_t checkSum,
+                          const uint64_t recordId) :
+            _xMagic(xMagic),
+            _checkSum(checkSum),
+            _recordId(recordId)
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction from
+         * existing RecordHeader instance.
+         *
+         * \param rh Header instance for which the RecordTail is to be created.
+         */
+        inline RecordTail(const RecordHeader& rh) :
+            _xMagic(~rh._magic),
+            _checkSum(rh.getCheckSum()),
+            _recordId(rh._recordId)
+        {}
+
+        /**
+         * \brief Copy constructor.
+         *
+         * \param rt Instance to be copied.
+         */
+        inline RecordTail(const RecordTail& rt) :
+            _xMagic(rt._xMagic),
+            _checkSum(rt._checkSum),
+            _recordId(rt._recordId)
+        {}
+
+        /**
+         * \brief Destructor.
+         */
+        virtual inline ~RecordTail()
+        {}
+
+        /**
+         * \brief Convenience copy method.
+         *
+         * \param rt Instance to be copied.
+         */
+        inline void copy(const RecordTail& rt) {
+            _xMagic = rt._xMagic;
+            _checkSum = rt._checkSum;
+            _recordId = rt._recordId;
+        }
+
+        /**
+         * \brief Resets all fields to default values (mostly 0).
+         */
+        inline void reset() {
+            _xMagic = 0xffffffff;
+            _checkSum = 0;
+            _recordId = 0;
+        }
+
+        /**
+        * \brief Returns the size of the header in bytes.
+        */
+        inline static std::size_t getSize() {
+            return sizeof(RecordTail);
+        }
+    }; // struct RecordTail
+
+
+
+
+    /**
+     * \brief Struct for data common to the head of all journal files. In addition to
+     * the common data, this includes the record ID and offset of the first record in
+     * the file.
+     *
+     * This header precedes all data in journal files and occupies the first complete
+     * block in the file. The record ID and offset are updated on each overwrite of the
+     * file.
+     *
+     * File layout in binary format (48 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+   | struct RecordHeader
+     * 0x08 |   _recordId (used to show first rid in file)  |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x10 |    _physicalFileId    |    _logicalFileId     |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x18 |              _firstRecordOffset               |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x20 |              _timestampSeconds                |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x28 | _timestampNanoSeconds |       _reserved       |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct FileHeader : public RecordHeader
+    {
+        uint32_t _physicalFileId;       ///< Physical file ID (pfid)
+        uint32_t _logicalFileId;        ///< Logical file ID (lfid)
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _firstRecordOffset; ///< First record offset
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Little-endian filler for 32-bit size_t
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler1;              ///< Big-endian filler for 32-bit time_t
+#endif
+        std::time_t _timestampSeconds;  ///< Timestamp of journal initialization, seconds component
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler1;              ///< Little-endian filler for 32-bit time_t
+#endif
+        uint32_t _timestampNanoSeconds; ///< Timestamp of journal initialization, nanoseconds component
+        uint32_t _reserved;             ///< Little-endian filler for uint32_t
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        inline FileHeader() :
+            RecordHeader(),
+            _physicalFileId(0),
+            _logicalFileId(0),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _firstRecordOffset(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _timestampSeconds(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _timestampNanoSeconds(0),
+            _reserved(0)
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic Magic for this record
+         * \param version Version of this record
+         * \param recordId RecordId for this record
+         * \param overwriteIndicator Overwrite indicator for this record
+         * \param physicalFileId Physical file ID (file number on disk)
+         * \param logicalFileId  Logical file ID (file number as seen by circular file buffer)
+         * \param firstRecordOffset First record offset in bytes from beginning of file
+         * \param setTimestampFlag If true, causes the timestamp to be initialized with the current system time
+         */
+        inline FileHeader(const uint32_t magic,
+                          const uint8_t version,
+                          const uint64_t recordId,
+                          const bool overwriteIndicator,
+                          const uint16_t physicalFileId,
+                          const uint16_t logicalFileId,
+                          const std::size_t firstRecordOffset,
+                          const bool setTimestampFlag = false) :
+            RecordHeader(magic, version, recordId, overwriteIndicator),
+            _physicalFileId(physicalFileId),
+            _logicalFileId(logicalFileId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _firstRecordOffset(firstRecordOffset),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _timestampSeconds(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _timestampNanoSeconds(0),
+            _reserved(0)
+        {
+            if (setTimestampFlag) setTimestamp();
+        }
+
+        /**
+         * \brief Copy constructor.
+         *
+         * \param fh FileHeader instance to be copied
+         */
+        inline FileHeader(const FileHeader& fh) :
+            RecordHeader(fh),
+            _physicalFileId(fh._physicalFileId),
+            _logicalFileId(fh._logicalFileId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(fh._filler0),
+#endif
+            _firstRecordOffset(fh._firstRecordOffset),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(fh._filler0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(fh._filler1),
+#endif
+            _timestampSeconds(fh._timestampSeconds),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(fh._filler1),
+#endif
+            _timestampNanoSeconds(fh._timestampNanoSeconds),
+            _reserved(fh._reserved)
+        {}
+
+        /**
+         * \brief Destructor.
+         */
+        virtual ~FileHeader()
+        {}
+
+        /**
+         * \brief Convenience copy method.
+         *
+         * \param fh FileHeader instance to be copied
+         */
+        inline void copy(const FileHeader& fh) {
+            RecordHeader::copy(fh);
+            _physicalFileId = fh._physicalFileId;
+            _logicalFileId = fh._logicalFileId;
+            _firstRecordOffset = fh._firstRecordOffset;
+            _timestampSeconds = fh._timestampSeconds;
+            _timestampNanoSeconds = fh._timestampNanoSeconds;
+            _reserved = fh._reserved;
+#if defined(JRNL_32_BIT)
+            _filler0 = fh._filler0;
+            _filler1 = fh._filler1;
+#endif
+        }
+
+        /**
+         * \brief Resets all fields to default values (mostly 0).
+         */
+        inline void reset() {
+            RecordHeader::reset();
+            _physicalFileId = 0;
+            _logicalFileId = 0;
+            _firstRecordOffset = 0;
+            _timestampSeconds = 0;
+            _timestampNanoSeconds = 0;
+            _reserved = 0;
+#if defined(JRNL_32_BIT)
+            _filler0 = 0;
+            _filler1 = 0;
+#endif
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes.
+         *
+         * \return Size of record header in bytes.
+         */
+        static inline std::size_t getHeaderSize() {
+            return sizeof(FileHeader);
+        }
+
+        /**
+         * \brief Return the body (data) size of this record in bytes.
+         *
+         * \return Size of record body in bytes. By definition, a FileHeader has no body.
+         */
+        inline std::size_t getBodySize() const {
+            return 0;
+        }
+
+        /**
+         * \brief Return total size of this record in bytes, being in the case of the
+         * FileHeader the size of the header itself only.
+         */
+        inline std::size_t getRecordSize() const {
+            return getHeaderSize();
+        }
+
+        /**
+         * \brief Gets the current time from the system clock and sets the timestamp in the struct.
+         */
+        inline void setTimestamp()
+        {
+            // TODO: Standardize on method for getting time that does not require a context switch.
+            timespec ts;
+            if (::clock_gettime(CLOCK_REALTIME, &ts))
+            {
+                std::ostringstream oss;
+                oss << FORMAT_SYSERR(errno);
+                throw JournalException(JournalErrors::JERR_RTCLOCK, oss.str(), "FileHeader", "setTimestamp");
+            }
+            setTimestamp(ts);
+        }
+
+        /**
+         * \brief Sets the timestamp in the struct to the provided value (in seconds and nanoseconds).
+         *
+         * \param ts Timestamp from which the file header time stamp is to be copied
+         */
+        inline void setTimestamp(const timespec& ts)
+        {
+            _timestampSeconds = ts.tv_sec;
+            _timestampNanoSeconds = ts.tv_nsec;
+        }
+    }; // struct FileHeader
+
+
+
+
+    /**
+     * \brief Struct for event records, which can be used to record system events in the
+     * store.
+     *
+     * The EventHeader record type may be used to store events into the journal which do
+     * not constitute data content but changes of state in the broker. These can be
+     * recovered and used to set appropriate state in the broker.
+     *
+     * This record is almost identical to the EnqueueRecord, but without the flags. I
+     * am uncertain at this time whether it is necessary to set an XID on an event
+     * record, but in case, I have left this feature in. In any event, there is only a
+     * 1 byte size penalty in the header size for doing so.
+     *
+     * Record layout in binary format (32 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+   | struct RecordHeader
+     * 0x08 |                   _recordId                   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x10 |                   _xidSize                    |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x18 |                   _dataSize                   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct EventHeader : public RecordHeader
+    {
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _xidSize;           ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Little-endian filler for 32-bit size_t
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler1;              ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _dataSize;          ///< Record data size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler1;              ///< Little-endian filler for 32-bit size_t
+#endif
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        inline EventHeader() :
+            RecordHeader(),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _dataSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler1(0)
+#endif
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic The magic for this record
+         * \param version Version of this record
+         * \param recordId Record identifier for this record
+         * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+         * \param dataSize Size of the opaque data block for this record
+         * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+         *        record
+         */
+        inline EventHeader(const uint32_t magic,
+                           const uint8_t version,
+                           const uint64_t recordId,
+                           const std::size_t xidSize,
+                           const std::size_t dataSize,
+                           const bool overwriteIndicator) :
+            RecordHeader(magic, version, recordId, overwriteIndicator),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(xidSize),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(0),
+#endif
+            _dataSize(dataSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler1(0)
+#endif
+        {}
+
+        /**
+         * \brief Copy constructor
+         *
+         * \param eh Instance to be copied
+         */
+        inline EventHeader(const EventHeader& eh) :
+            RecordHeader(eh),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(eh._filler0),
+#endif
+            _xidSize(eh._xidSize),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(eh._filler0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler1(eh._filler1),
+#endif
+            _dataSize(eh._dataSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        , _filler1(eh._filler1)
+#endif
+        {}
+
+        /**
+         * \brief Destructor.
+         */
+        virtual ~EventHeader() {}
+
+        /**
+         * \brief Convenience copy method.
+         */
+        virtual inline void copy(const EventHeader& e) {
+            RecordHeader::copy(e);
+            _xidSize = e._xidSize;
+            _dataSize = e._dataSize;
+#if defined(JRNL_32_BIT)
+            _filler0 = e._filler0;
+            _filler1 = e._filler1;
+#endif
+        }
+
+        /**
+         * \brief Reset this record to default values (mostly 0)
+         */
+        virtual inline void reset() {
+            RecordHeader::reset();
+            _xidSize = 0;
+            _dataSize = 0;
+#if defined(JRNL_32_BIT)
+            _filler0 = 0;
+            _filler1 = 0;
+#endif
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes.
+         *
+         * \return Size of record header in bytes.
+         */
+        static inline std::size_t getHeaderSize() {
+            return sizeof(EventHeader);
+        }
+
+        /**
+         * \brief Return the body (data) size of this record in bytes.
+         *
+         * \return Size of record body in bytes.
+         */
+        virtual inline std::size_t getBodySize() const {
+            return _xidSize + _dataSize;
+        }
+
+        /**
+         * \brief Return total size of this record in bytes, being in the case of the
+         * enqueue record the size of the header, the size of the body (xid and data)
+         * and the size of the tail.
+         */
+        virtual inline std::size_t getRecordSize() const {
+            return getHeaderSize() + (getBodySize() ?
+                   getBodySize() + RecordTail::getSize() :
+                   0);
+        }
+    }; // struct EventHeader
+
+
+
+
+    /**
+     * \brief Struct for enqueue record.
+     *
+     * Struct for enqueue record. In addition to the common data, this header includes both the
+     * xid and data blob sizes.
+     *
+     * This header precedes all enqueue data in journal files.
+     *
+     * Record layout in binary format (32 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+   | struct RecordHeader
+     * 0x08 |                   _recordId                   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x10 |                   _xidSize                    |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x18 |                   _dataSize                   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct EnqueueHeader : public EventHeader
+    {
+        static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+        static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        inline EnqueueHeader() :
+                EventHeader()
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic The magic for this record
+         * \param version Version of this record
+         * \param recordId Record identifier for this record
+         * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+         * \param dataSize Size of the opaque data block for this record
+         * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+         *        record
+         * \param transient Flag indicating that this record is transient (ie to be discarded on recovery)
+         * \param external Flag indicating that this record's data is stored externally to the journal, the data portion
+         *        of the record identifies the storage location.
+         */
+        inline EnqueueHeader(const uint32_t magic,
+                       const uint8_t version,
+                       const uint64_t recordId,
+                       const std::size_t xidSize,
+                       const std::size_t dataSize,
+                       const bool overwriteIndicator,
+                       const bool transient = false,
+                       const bool external = false) :
+            EventHeader(magic, version, recordId, xidSize, dataSize, overwriteIndicator)
+        {
+            setTransientFlag(transient);
+            setExternalFlag(external);
+        }
+
+        /**
+         * \brief Copy constructor
+         *
+         * \param eh Instance to be copied
+         */
+        inline EnqueueHeader(const EnqueueHeader& eh) :
+            EventHeader(eh)
+        {}
+
+        /**
+         * \brief Destructor.
+         */
+        virtual ~EnqueueHeader() {}
+
+        /**
+         * \brief Return the value of the Transient flag for this record.
+         * If set, this record is ignored during recovery.
+         *
+         * \return true if the Transient flag for this record is set, false otherwise.
+         */
+        inline bool getTransientFlag() const {
+            return _flags & ENQ_HDR_TRANSIENT_MASK;
+        }
+
+        /**
+         * \brief Set the value of the Transient flag for this record.
+         *
+         * \param transient The value to be set in the transient flag.
+         */
+        inline void setTransientFlag(const bool transient = true) {
+            _flags = transient ?
+                     _flags | ENQ_HDR_TRANSIENT_MASK :
+                     _flags & (~ENQ_HDR_TRANSIENT_MASK);
+        }
+
+        /**
+         * \brief Return the value of the External flag for this record. If set, this record data is not within the
+         * journal but external to it. The data part of this record contains the location of the stored data.
+         *
+         * \return true if the Transient flag for this record is set, false otherwise.
+         */
+        inline bool getExternalFlag() const {
+            return _flags & ENQ_HDR_EXTERNAL_MASK;
+        }
+
+        /**
+         * \brief Set the value of the External flag for this record.
+         *
+         * \param external The value to be set in the External flag.
+         */
+        inline void setExternalFlag(const bool external = true) {
+            _flags = external ?
+                     _flags | ENQ_HDR_EXTERNAL_MASK :
+                     _flags & (~ENQ_HDR_EXTERNAL_MASK);
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes.
+         *
+         * \return Size of record header in bytes.
+         */
+        static inline std::size_t getHeaderSize() {
+            return sizeof(EnqueueHeader);
+        }
+    }; // struct EnqueueHeader
+
+
+
+
+    /**
+     * \brief Struct for dequeue record.
+     *
+     * Struct for dequeue record. If this record has a non-zero xidsize field (i.e., there is a
+     * valid XID), then this header is followed by the XID of xidsize bytes and a rec_tail. If,
+     * on the other hand, this record has a zero xidsize (i.e., there is no XID), then the rec_tail
+     * is absent.
+     *
+     * Note that this record had its own rid distinct from the rid of the record it is dequeuing.
+     * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+     * previous enqueue record being dequeued by this record.
+     *
+     * Record layout in binary format (32 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+   | struct RecordHeader
+     * 0x08 |                   _recordId                   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x10 |               _dequeuedRecordId               |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * 0x18 |                   _xidSize                    |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct DequeueHeader : public RecordHeader
+    {
+        uint64_t _dequeuedRecordId;     ///< Record ID of dequeued record
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _xidSize;           ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Little-endian filler for 32-bit size_t
+#endif
+
+        static const uint16_t DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK = 0x10;
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        inline DequeueHeader() :
+            RecordHeader(),
+            _dequeuedRecordId(0),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic The magic for this record
+         * \param version Version of this record
+         * \param recordId Record identifier for this record
+         * \param dequeuedRecordId  Record identifier of the record being dequeued by this record
+         * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+         * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+         *        record
+         * \param tplCommitOnTxnComplFlag
+         */
+        inline DequeueHeader(const uint32_t magic,
+                             const uint8_t version,
+                             const uint64_t recordId,
+                             const uint64_t dequeuedRecordId,
+                             const std::size_t xidSize,
+                             const bool overwriteIndicator,
+                             const bool tplCommitOnTxnComplFlag = false) :
+            RecordHeader(magic, version, recordId, overwriteIndicator),
+            _dequeuedRecordId(dequeuedRecordId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {
+            setTplCommitOnTxnComplFlag(tplCommitOnTxnComplFlag);
+        }
+
+        /**
+         * \brief Copy constructor
+         *
+         * \param dh Instance to be copied
+         */
+        inline DequeueHeader(const DequeueHeader& dh) :
+            RecordHeader(dh),
+            _dequeuedRecordId(dh._dequeuedRecordId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(dh._filler0),
+#endif
+            _xidSize(dh._xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(dh._filler0)
+#endif
+        {}
+
+
+        /**
+         * \brief Destructor.
+         */
+        virtual ~DequeueHeader() {}
+
+        /**
+         * \brief Convenience copy method.
+         */
+        inline void copy(const DequeueHeader& dh) {
+            RecordHeader::copy(dh);
+            _dequeuedRecordId = dh._dequeuedRecordId;
+            _xidSize = dh._xidSize;
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0 = dh._filler0;
+#endif
+        }
+
+        /**
+         * \brief Reset this record to default values (mostly 0)
+         */
+        inline void reset() {
+            RecordHeader::reset();
+            _dequeuedRecordId = 0;
+            _xidSize = 0;
+#if defined(JRNL_32_BIT)
+            _filler0 = 0;
+#endif
+        }
+
+        /**
+         * \brief Return the value of the tplCommitOnTxnComplFlag for this record. This
+         * flag is used only within the TPL, and if set, indicates that the transaction was
+         * closed using a commit. If not set, the transaction was closed using an abort.
+         * This is used during recovery of the transactions in the store.
+         *
+         * \return true if the tplCommitOnTxnComplFlag flag for this record is set, false
+         * otherwise.
+         */
+        inline bool getTplCommitOnTxnComplFlag() const {
+            return _flags & DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK;
+        }
+
+        /**
+         * \brief Set the value of the tplCommitOnTxnComplFlag for this record. This is
+         * only used in the TPL, and is ignored elsewhere.
+         *
+         * \param commitOnTxnCompl The value to be set in the tplCommitOnTxnComplFlag. If
+         * true, the transaction was closed with a commit; if false, with an abort.
+         */
+        inline void setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl) {
+            _flags = commitOnTxnCompl ?
+                     _flags | DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK :
+                     _flags & (~DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK);
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes.
+         *
+         * \return Size of record header in bytes.
+         */
+        static std::size_t getHeaderSize() {
+            return sizeof(DequeueHeader);
+        }
+
+        /**
+         * \brief Return the body (xid and data) size of this record in bytes.
+         *
+         * \return Size of record body in bytes.
+         */
+        std::size_t getBodySize() const {
+            return _xidSize;
+        }
+
+        /**
+         * \brief Return total size of this record in bytes, being in the case of the
+         * dequeue record the size of the header, the size of the body (xid only) and
+         * the size of the tail.
+         */
+        inline std::size_t getRecordSize() const {
+            return getHeaderSize() + (getBodySize() ?
+                   getBodySize() + RecordTail::getSize() :
+                   0);
+        }
+    }; // struct DequeueHeader
+
+
+
+
+    /**
+     * \brief Struct for transaction commit and abort records.
+     *
+     * Struct for DTX commit and abort records. Only the magic distinguishes between them. Since
+     * this record must be used in the context of a valid XID, the xidsize field must not be zero.
+     * Immediately following this record is the XID itself which is xidsize bytes long, followed by
+     * a rec_tail.
+     *
+     * Note that this record had its own rid distinct from the rids of the record(s) making up the
+     * transaction it is committing or aborting.
+     *
+     * Record layout in binary format (24 bytes):
+     * <pre>
+     *        0x0                                       0x7
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x00 |        _magic         |  v  |  e  |  _flags   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+   | struct RecordHeader
+     * 0x08 |                   _recordId                   |   |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+  -+
+     * 0x10 |                   _xidSize                    |
+     *      +-----+-----+-----+-----+-----+-----+-----+-----+
+     * </pre>
+     * <table>
+     * <tr>
+     *     <td>v</td>
+     *     <td>file version [ <code>_version</code> ] (If the format or encoding of
+     *     this file changes, then this number should be incremented)</td>
+     * </tr>
+     * <tr>
+     *     <td>e</td>
+     *     <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+     *     little endian, <b>true</b> (0x01) for big endian</td>
+     * </tr>
+     * </table>
+     */
+    struct TransactionHeader : public RecordHeader
+    {
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Big-endian filler for 32-bit size_t
+#endif
+        std::size_t _xidSize;           ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        uint32_t _filler0;              ///< Little-endian filler for 32-bit size_t
+#endif
+
+        /**
+         * \brief Default constructor, which sets all values to 0.
+         */
+        TransactionHeader() :
+            RecordHeader(),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+         * \brief Convenience constructor which initializes values during construction.
+         *
+         * \param magic The magic for this record
+         * \param version Version of this record
+         * \param recordId Record identifier for this record
+         * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+         * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+         *        record
+         */
+        TransactionHeader(const uint32_t magic,
+                          const uint8_t version,
+                          const uint64_t recordId,
+                          const std::size_t xidSize,
+                          const bool overwriteIndicator) :
+            RecordHeader(magic, version, recordId, overwriteIndicator),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(0),
+#endif
+            _xidSize(xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(0)
+#endif
+        {}
+
+        /**
+         * \brief Copy constructor
+         *
+         * \param th Instance to be copied
+         */
+        TransactionHeader(const TransactionHeader& th) :
+            RecordHeader(th),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+            _filler0(th._filler0),
+#endif
+            _xidSize(th._xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+            , _filler0(th._filler0)
+#endif
+        {}
+
+
+        /**
+         * \brief Destructor.
+         */
+        virtual ~TransactionHeader()
+        {}
+
+        /**
+         * \brief Convenience copy method.
+         */
+        inline void copy(const TransactionHeader& th) {
+            RecordHeader::copy(th);
+            _xidSize = th._xidSize;
+#if defined(JRNL_32_BIT)
+            _filler0 = th._filler0;
+#endif
+        }
+
+        /**
+         * \brief Reset this record to default values (mostly 0)
+         */
+        inline void reset() {
+            RecordHeader::reset();
+            _xidSize = 0;
+#if defined(JRNL_32_BIT)
+            _filler0 = 0;
+#endif
+        }
+
+        /**
+         * \brief Return the header size of this record in bytes.
+         *
+         * \return Size of record header in bytes.
+         */
+        static std::size_t getHeaderSize() { return sizeof(TransactionHeader); }
+
+        /**
+         * \brief Return the body (data) size of this record in bytes.
+         *
+         * \return Size of record body in bytes.
+         */
+        std::size_t getBodySize() const { return _xidSize; }
+
+        /**
+         * \brief Return total size of this record in bytes, being in the case of the
+         * dequeue record the size of the header, the size of the body (xid only) and
+         * the size of the tail.
+         */
+        inline std::size_t getRecordSize() const {
+            // By definition, TransactionRecords must always have an xid, hence a record
+            // tail as well. No check on body size required in this case.
+            return getHeaderSize() + getBodySize() + RecordTail::getSize();
+        }
+    }; // struct TransactionHeader
+
+#pragma pack()
+
+} // namespace journal2
+} // namespace mrg
+
+#endif // ifndef mrg_journal2_RecordHeader_hpp

Modified: store/trunk/cpp/perf/JournalParameters.cpp
===================================================================
--- store/trunk/cpp/perf/JournalParameters.cpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/JournalParameters.cpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,5 +1,5 @@
 /**
- * \file JournalParameters.cpp
+ * \file perf/JournalParameters.cpp
  *
  * Qpid asynchronous store plugin library
  *

Modified: store/trunk/cpp/perf/JournalParameters.hpp
===================================================================
--- store/trunk/cpp/perf/JournalParameters.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/JournalParameters.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,5 +1,5 @@
 /**
- * \file  JournalParameters.hpp
+ * \file perf/JournalParameters.hpp
  *
  * Qpid asynchronous store plugin library
  *

Modified: store/trunk/cpp/perf/ScopedTimer.hpp
===================================================================
--- store/trunk/cpp/perf/ScopedTimer.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/ScopedTimer.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -64,17 +64,17 @@
     class ScopedTimer
     {
         double& _elapsed;           ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances
-        ::timespec  _startTime;     ///< Start time, set on construction
+        std::timespec _startTime;   ///< Start time, set on construction
 
         /**
          * \brief Convert ::timespec to seconds
          *
          * Static function to convert a ::timespec struct into a double representation in seconds.
          *
-         * \param ts ::timespec struct containing the time to be converted.
+         * \param ts std::timespec struct containing the time to be converted.
          * \return A double which represents the time in parameter ts in seconds.
          */
-        static double _s_getDoubleTime(const ::timespec& ts);
+        static double _s_getDoubleTime(const std::timespec& ts);
 
     public:
         /**

Modified: store/trunk/cpp/perf/TestParameters.hpp
===================================================================
--- store/trunk/cpp/perf/TestParameters.hpp	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/TestParameters.hpp	2011-01-14 19:52:45 UTC (rev 4438)
@@ -79,7 +79,7 @@
          * \param numQueues Number of queues to test simultaneously
          * \param numThreadPairsPerQueue Number of thread pairs (enq and deq) per queue
          * \param enqTxnBlockSize Transaction block size for enqueues
-         * \parma deqTxnBlockSize Transaction block size for dequeues
+         * \param deqTxnBlockSize Transaction block size for dequeues
          */
         TestParameters(const uint32_t numMsgs,
                        const uint32_t msgSize,

Modified: store/trunk/cpp/perf/m
===================================================================
--- store/trunk/cpp/perf/m	2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/m	2011-01-14 19:52:45 UTC (rev 4438)
@@ -5,7 +5,7 @@
 
 # The variable JOURNAL2, if defined, will link with the new journal2 namespace journal. Otherwise the old journal
 # namespace will be used.
-#JOURNAL2=1
+JOURNAL2=1
 
 # Optimization options
 #OPT="-O0 -ggdb"



More information about the rhmessaging-commits mailing list