rhmessaging commits: r4439 - store/trunk/cpp/tests/python_tests.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2011-01-21 12:49:28 -0500 (Fri, 21 Jan 2011)
New Revision: 4439
Modified:
store/trunk/cpp/tests/python_tests/client_persistence.py
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/python_tests/resize.py
store/trunk/cpp/tests/python_tests/store_test.py
Log:
Move brokertest.py from qpid/python to qpid/cpp.
brokertest.py is a framework for tests using the C++ broker, so it
belongs in the cpp tree rather than the python tree.
It is installed to libexec/qpid/tests/ so it can be used from
an installation of qpid cpp.
Modified: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py 2011-01-14 19:52:45 UTC (rev 4438)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py 2011-01-21 17:49:28 UTC (rev 4439)
@@ -21,7 +21,7 @@
The GNU Lesser General Public License is available in the file COPYING.
"""
-from qpid.brokertest import EXPECT_EXIT_OK
+from brokertest import EXPECT_EXIT_OK
from store_test import StoreTest, Qmf, store_args
from qpid.messaging import *
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2011-01-14 19:52:45 UTC (rev 4438)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2011-01-21 17:49:28 UTC (rev 4439)
@@ -22,7 +22,7 @@
"""
import qpid
-from qpid.brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
+from brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
from qpid.datatypes import uuid4
from store_test import StoreTest, store_args
from qpid.messaging import Message, TargetCapacityExceeded, ServerError #SessionError, SendError
Modified: store/trunk/cpp/tests/python_tests/resize.py
===================================================================
--- store/trunk/cpp/tests/python_tests/resize.py 2011-01-14 19:52:45 UTC (rev 4438)
+++ store/trunk/cpp/tests/python_tests/resize.py 2011-01-21 17:49:28 UTC (rev 4439)
@@ -25,7 +25,7 @@
import os
import subprocess
-from qpid.brokertest import EXPECT_EXIT_OK
+from brokertest import EXPECT_EXIT_OK
from qpid.datatypes import uuid4
from store_test import StoreTest, store_args
from qpid.messaging import Message
Modified: store/trunk/cpp/tests/python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py 2011-01-14 19:52:45 UTC (rev 4438)
+++ store/trunk/cpp/tests/python_tests/store_test.py 2011-01-21 17:49:28 UTC (rev 4439)
@@ -22,7 +22,7 @@
"""
import re
-from qpid.brokertest import BrokerTest
+from brokertest import BrokerTest
from qpid.messaging import Empty
from qmf.console import Session
13 years, 11 months
rhmessaging commits: r4438 - in store/trunk/cpp: lib/jrnl2 and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2011-01-14 14:52:45 -0500 (Fri, 14 Jan 2011)
New Revision: 4438
Added:
store/trunk/cpp/lib/jrnl2/Configuration.hpp
store/trunk/cpp/lib/jrnl2/RecordHeader.hpp
Modified:
store/trunk/cpp/lib/BindingDbt.cpp
store/trunk/cpp/lib/BufferValue.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl2/AioCallback.hpp
store/trunk/cpp/lib/jrnl2/DataToken.hpp
store/trunk/cpp/lib/jrnl2/Journal.cpp
store/trunk/cpp/lib/jrnl2/Journal.hpp
store/trunk/cpp/lib/jrnl2/JournalErrors.cpp
store/trunk/cpp/lib/jrnl2/JournalErrors.hpp
store/trunk/cpp/lib/jrnl2/JournalException.cpp
store/trunk/cpp/lib/jrnl2/JournalException.hpp
store/trunk/cpp/lib/jrnl2/JournalParameters.cpp
store/trunk/cpp/lib/jrnl2/JournalParameters.hpp
store/trunk/cpp/lib/jrnl2/Makefile.am
store/trunk/cpp/lib/jrnl2/README
store/trunk/cpp/perf/JournalParameters.cpp
store/trunk/cpp/perf/JournalParameters.hpp
store/trunk/cpp/perf/ScopedTimer.hpp
store/trunk/cpp/perf/TestParameters.hpp
store/trunk/cpp/perf/m
Log:
Further work in jrnl2 and perf.
Modified: store/trunk/cpp/lib/BindingDbt.cpp
===================================================================
--- store/trunk/cpp/lib/BindingDbt.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/BindingDbt.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -23,14 +23,10 @@
#include "BindingDbt.h"
-using namespace mrg::msgstore;
-using qpid::broker::PersistableExchange;
-using qpid::broker::PersistableQueue;
-using qpid::framing::FieldTable;
-using std::string;
+namespace mrg {
+namespace msgstore {
-
-BindingDbt::BindingDbt(const PersistableExchange& e, const PersistableQueue& q, const string& k, const FieldTable& a)
+BindingDbt::BindingDbt(const qpid::broker::PersistableExchange& e, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
: data(new char[encodedSize(e, q, k, a)]),
buffer(data, encodedSize(e, q, k, a))
{
@@ -48,7 +44,9 @@
delete [] data;
}
-uint32_t BindingDbt::encodedSize(const PersistableExchange& /*not used*/, const PersistableQueue& q, const string& k, const FieldTable& a)
+uint32_t BindingDbt::encodedSize(const qpid::broker::PersistableExchange& /*not used*/, const qpid::broker::PersistableQueue& q, const std::string& k, const qpid::framing::FieldTable& a)
{
return 8 /*queue id*/ + q.getName().size() + 1 + k.size() + 1 + a.encodedSize();
}
+
+}}
Modified: store/trunk/cpp/lib/BufferValue.cpp
===================================================================
--- store/trunk/cpp/lib/BufferValue.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/BufferValue.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -23,9 +23,11 @@
#include "BufferValue.h"
-using namespace mrg::msgstore;
-using qpid::broker::Persistable;
+namespace mrg {
+namespace msgstore {
+
+
BufferValue::BufferValue(u_int32_t size, u_int64_t offset)
: data(new char[size]),
buffer(data, size)
@@ -38,7 +40,7 @@
set_ulen(size);
}
-BufferValue::BufferValue(const Persistable& p)
+BufferValue::BufferValue(const qpid::broker::Persistable& p)
: data(new char[p.encodedSize()]),
buffer(data, p.encodedSize())
{
@@ -52,3 +54,5 @@
{
delete [] data;
}
+
+}}
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -35,18 +35,12 @@
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
-using namespace mrg::msgstore;
-using namespace qpid::broker;
-using boost::static_pointer_cast;
-using boost::intrusive_ptr;
-
-using std::auto_ptr;
-using std::max;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
+namespace mrg {
+namespace msgstore {
+
+
const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name
// FIXME aconway 2010-03-09: was 10
qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
@@ -224,7 +218,7 @@
autoJrnlExpandMaxFiles = p;
}
-void MessageStoreImpl::initManagement (Broker* broker)
+void MessageStoreImpl::initManagement (qpid::broker::Broker* broker)
{
if (broker != 0) {
agent = broker->getManagementAgent();
@@ -484,15 +478,15 @@
}
}
-void MessageStoreImpl::create(PersistableQueue& queue,
- const FieldTable& args)
+void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
JournalImpl* jQueue = 0;
- FieldTable::ValuePtr value;
+ qpid::framing::FieldTable::ValuePtr value;
u_int16_t localFileCount = numJrnlFiles;
bool localAutoExpandFlag = autoJrnlExpand;
@@ -529,7 +523,7 @@
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localAutoExpandMaxFileCount = (u_int16_t) value->get<int>();
- queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
@@ -545,7 +539,7 @@
}
}
-void MessageStoreImpl::destroy(PersistableQueue& queue)
+void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue)
{
checkInit();
destroy(queueDb, queue);
@@ -562,8 +556,8 @@
}
}
-void MessageStoreImpl::create(const PersistableExchange& exchange,
- const FieldTable& /*args*/)
+void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange,
+ const qpid::framing::FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
@@ -578,7 +572,7 @@
}
}
-void MessageStoreImpl::destroy(const PersistableExchange& exchange)
+void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange)
{
checkInit();
destroy(exchangeDb, exchange);
@@ -587,7 +581,7 @@
bindingDb->del(0, &key, DB_AUTO_COMMIT);
}
-void MessageStoreImpl::create(const PersistableConfig& general)
+void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general)
{
checkInit();
if (general.getPersistenceId()) {
@@ -602,7 +596,7 @@
}
}
-void MessageStoreImpl::destroy(const PersistableConfig& general)
+void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general)
{
checkInit();
destroy(generalDb, general);
@@ -610,7 +604,7 @@
bool MessageStoreImpl::create(db_ptr db,
IdSequence& seq,
- const Persistable& p)
+ const qpid::broker::Persistable& p)
{
u_int64_t id (seq.next());
Dbt key(&id, sizeof(id));
@@ -634,17 +628,17 @@
}
}
-void MessageStoreImpl::destroy(db_ptr db, const Persistable& p)
+void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p)
{
IdDbt key(p.getPersistenceId());
db->del(0, &key, DB_AUTO_COMMIT);
}
-void MessageStoreImpl::bind(const PersistableExchange& e,
- const PersistableQueue& q,
+void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e,
+ const qpid::broker::PersistableQueue& q,
const std::string& k,
- const FieldTable& a)
+ const qpid::framing::FieldTable& a)
{
checkInit();
IdDbt key(e.getPersistenceId());
@@ -660,16 +654,16 @@
}
}
-void MessageStoreImpl::unbind(const PersistableExchange& e,
- const PersistableQueue& q,
+void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e,
+ const qpid::broker::PersistableQueue& q,
const std::string& k,
- const FieldTable&)
+ const qpid::framing::FieldTable&)
{
checkInit();
deleteBinding(e, q, k);
}
-void MessageStoreImpl::recover(RecoveryManager& registry)
+void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
{
checkInit();
txn_list prepared;
@@ -721,11 +715,11 @@
if (citr->second.tpc_flag) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
- std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
tpcc->recoverDtok(citr->second.rid, xid);
tpcc->prepare(tplStorePtr.get());
- RecoverableTransaction::shared_ptr dtx;
+ qpid::broker::RecoverableTransaction::shared_ptr dtx;
if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
@@ -770,7 +764,7 @@
}
void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
- RecoveryManager& registry,
+ qpid::broker::RecoveryManager& registry,
queue_index& queue_index,
txn_list& prepared,
message_index& messages)
@@ -784,9 +778,9 @@
Dbt value;
//read all queues
while (queues.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Queue instance
- RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
+ qpid::broker::RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
@@ -804,7 +798,7 @@
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
}
- queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+ queue->setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try
{
@@ -825,7 +819,7 @@
//read all messages: done on a per queue basis if using Journal
queue_index[key.id] = queue;
- maxQueueId = max(key.id, maxQueueId);
+ maxQueueId = std::max(key.id, maxQueueId);
}
// NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
@@ -838,7 +832,7 @@
void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
- RecoveryManager& registry,
+ qpid::broker::RecoveryManager& registry,
exchange_index& index)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
@@ -850,15 +844,15 @@
Dbt value;
//read all exchanges
while (exchanges.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Exchange instance
- RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
+ qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
if (exchange) {
//set the persistenceId and update max as required
exchange->setPersistenceId(key.id);
index[key.id] = exchange;
}
- maxExchangeId = max(key.id, maxExchangeId);
+ maxExchangeId = std::max(key.id, maxExchangeId);
}
exchangeIdSequence.reset(maxExchangeId + 1);
}
@@ -873,7 +867,7 @@
IdDbt key;
Dbt value;
while (bindings.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
QPID_LOG(error, "Not enough data for binding: " << buffer.available());
THROW_STORE_EXCEPTION("Not enough data for binding");
@@ -881,7 +875,7 @@
uint64_t queueId = buffer.getLongLong();
std::string queueName;
std::string routingkey;
- FieldTable args;
+ qpid::framing::FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
@@ -899,7 +893,7 @@
}
void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
- RecoveryManager& registry)
+ qpid::broker::RecoveryManager& registry)
{
Cursor items;
items.open(generalDb, txn.get());
@@ -909,12 +903,12 @@
Dbt value;
//read all items
while (items.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create instance
- RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
+ qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
//set the persistenceId and update max as required
config->setPersistenceId(key.id);
- maxGeneralId = max(key.id, maxGeneralId);
+ maxGeneralId = std::max(key.id, maxGeneralId);
}
generalIdSequence.reset(maxGeneralId + 1);
}
@@ -958,15 +952,15 @@
{
case mrg::journal::RHM_IORES_SUCCESS: {
msg_count++;
- RecoverableMessage::shared_ptr msg;
+ qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
unsigned headerSize;
if (externalFlag) {
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
+ qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
@@ -978,7 +972,7 @@
u_int64_t contentSize = readSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
+ qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
@@ -1052,7 +1046,7 @@
}
}
-RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
+qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
uint64_t /*messageId*/,
unsigned& /*headerSize*/)
{
@@ -1061,7 +1055,7 @@
int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
- RecoverableMessage::shared_ptr& msg,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index,
txn_list& prepared,
message_index& messages)
@@ -1077,7 +1071,7 @@
QPID_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
} else {
- RecoverableQueue::shared_ptr queue = index[value.id];
+ qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id];
if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
messages[msgId.id] = msg;
} else {
@@ -1209,24 +1203,24 @@
}
}
-void MessageStoreImpl::stage(const intrusive_ptr<PersistableMessage>& /*msg*/)
+void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
}
-void MessageStoreImpl::destroy(PersistableMessage& /*msg*/)
+void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
}
-void MessageStoreImpl::appendContent(const intrusive_ptr<const PersistableMessage>& /*msg*/,
+void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
const std::string& /*data*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
}
void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
- const intrusive_ptr<const PersistableMessage>& msg,
+ const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data,
u_int64_t offset,
u_int32_t length)
@@ -1272,9 +1266,9 @@
}
}
-void MessageStoreImpl::enqueue(TransactionContext* ctxt,
- const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
@@ -1303,7 +1297,7 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
-u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const intrusive_ptr<PersistableMessage>& message)
+u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
@@ -1313,15 +1307,15 @@
oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
THROW_STORE_EXCEPTION(oss.str());
}
- Buffer buffer(&buff[0],size);
+ qpid::framing::Buffer buffer(&buff[0],size);
buffer.putLong(headerSize);
message->encode(buffer);
return size;
}
-void MessageStoreImpl::store(const PersistableQueue* queue,
+void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
- const intrusive_ptr<PersistableMessage>& message,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
bool /*newId*/)
{
std::vector<char> buff;
@@ -1358,9 +1352,9 @@
}
}
-void MessageStoreImpl::dequeue(TransactionContext* ctxt,
- const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
@@ -1387,9 +1381,9 @@
msg->dequeueComplete();
}
-void MessageStoreImpl::async_dequeue(TransactionContext* ctxt,
- const intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
+ const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->setSourceMessage(msg);
@@ -1451,11 +1445,11 @@
}
}
-auto_ptr<TransactionContext> MessageStoreImpl::begin()
+std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin()
{
checkInit();
// pass sequence number for c/a
- return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
+ return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence));
}
std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid)
@@ -1463,14 +1457,14 @@
checkInit();
IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
- return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
+ return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
}
void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
- if(!txn) throw InvalidTransactionContextException();
+ if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
@@ -1502,7 +1496,7 @@
}
}
-void MessageStoreImpl::commit(TransactionContext& ctxt)
+void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
@@ -1513,7 +1507,7 @@
completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
-void MessageStoreImpl::abort(TransactionContext& ctxt)
+void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
@@ -1524,10 +1518,10 @@
completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
-TxnCtxt* MessageStoreImpl::check(TransactionContext* ctxt)
+TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt)
{
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt);
- if(!txn) throw InvalidTransactionContextException();
+ if(!txn) throw qpid::broker::InvalidTransactionContextException();
return txn;
}
@@ -1548,7 +1542,7 @@
}
}
-void MessageStoreImpl::deleteBindingsForQueue(const PersistableQueue& queue)
+void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue)
{
TxnCtxt txn;
txn.begin(dbenv.get(), true);
@@ -1560,7 +1554,7 @@
IdDbt key;
Dbt value;
while (bindings.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
THROW_STORE_EXCEPTION("Not enough data for binding");
}
@@ -1582,8 +1576,8 @@
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
-void MessageStoreImpl::deleteBinding(const PersistableExchange& exchange,
- const PersistableQueue& queue,
+void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
const std::string& bkey)
{
TxnCtxt txn;
@@ -1597,7 +1591,7 @@
Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
THROW_STORE_EXCEPTION("Not enough data for binding");
}
@@ -1719,3 +1713,4 @@
;
}
+}}
Modified: store/trunk/cpp/lib/jrnl2/AioCallback.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/AioCallback.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/AioCallback.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
#define mrg_journal2_AioCallback_hpp
#include "DataToken.hpp"
-#include <sys/types.h> // u_int16_t, u_int32_t
+#include <stdint.h> // uint16_t
#include <vector>
namespace mrg
@@ -46,7 +46,7 @@
public:
virtual ~AioCallback() {}
virtual void writeAioCompleteCallback(std::vector<DataToken*>& dataTokenList) = 0;
- virtual void readAioCompleteCallback(std::vector<u_int16_t>& buffPageCtrlBlkIndexList) = 0;
+ virtual void readAioCompleteCallback(std::vector<uint16_t>& buffPageCtrlBlkIndexList) = 0;
};
} // namespace journal2
Added: store/trunk/cpp/lib/jrnl2/Configuration.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Configuration.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl2/Configuration.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -0,0 +1,112 @@
+/**
+ * \file Configuration.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains async journal code (v.2).
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+#ifndef mrg_journal2_Configuration_hpp
+#define mrg_journal2_Configuration_hpp
+
+#include <stdint.h> // uint8_t
+
+#if defined(__i386__) /* little endian, 32 bits */
+ #define JRNL_LITTLE_ENDIAN
+// #define JRNL_32_BIT
+#elif defined(__PPC__) || defined(__s390__) /* big endian, 32 bits */
+ #define JRNL_BIG_ENDIAN
+// #define JRNL_32_BIT
+#elif defined(__ia64__) || defined(__x86_64__) || defined(__alpha__) /* little endian, 64 bits */
+ #define JRNL_LITTLE_ENDIAN
+// #define JRNL_64_BIT
+#elif defined(__powerpc64__) || defined(__s390x__) /* big endian, 64 bits */
+ #define JRNL_BIG_ENDIAN
+// #define JRNL_64_BIT
+#else
+ #error Unable to determine endianness
+#endif
+
+
+/**
+* <b>Rule:</b> Data block size (JRNL_DBLK_SIZE) MUST be a power of 2 such that
+* <pre>
+* JRNL_DBLK_SIZE * JRNL_SBLK_SIZE == n * 512 (n = 1,2,3...)
+* </pre>
+* (The disk softblock size is 512 for Linux kernels >= 2.6)
+*/
+//#define JRNL_DBLK_SIZE 128 ///< Data block size in bytes (CANNOT BE LESS THAN 32!)
+//#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
+//#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MAX_FILE_SIZE 4194304 ///< Max. jrnl file size in sblks (excl. FileHeader)
+//#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
+//#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files
+//#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed
+//
+//#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
+//#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
+//
+//#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default)
+//#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
+//
+//#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr
+//#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO
+//
+//#define JRNL_INFO_EXTENSION "jinf" ///< Extension for journal info files
+//#define JRNL_DATA_EXTENSION "jdat" ///< Extension for journal data files
+//#define RHM_JDAT_TXA_MAGIC 0x614d4852 ///< ("RHMa" in little endian) Magic for dtx abort hdrs
+//#define RHM_JDAT_TXC_MAGIC 0x634d4852 ///< ("RHMc" in little endian) Magic for dtx commit hdrs
+//#define RHM_JDAT_DEQ_MAGIC 0x644d4852 ///< ("RHMd" in little endian) Magic for deq rec hdrs
+//#define RHM_JDAT_ENQ_MAGIC 0x654d4852 ///< ("RHMe" in little endian) Magic for enq rec hdrs
+//#define RHM_JDAT_FILE_MAGIC 0x664d4852 ///< ("RHMf" in little endian) Magic for file hdrs
+//#define RHM_JDAT_EMPTY_MAGIC 0x784d4852 ///< ("RHMx" in little endian) Magic for empty dblk
+//#define RHM_JDAT_VERSION 0x01 ///< Version (of file layout)
+//#define RHM_CLEAN_CHAR 0xff ///< Char used to clear empty space on disk
+
+#define RHM_LENDIAN_FLAG 0x0 ///< Value of little endian flag on disk
+#define RHM_BENDIAN_FLAG 0x1 ///< Value of big endian flag on disk
+
+namespace mrg
+{
+namespace journal2
+{
+
+ struct Configuration
+ {
+ static const uint8_t _s_endianValue =
+#if defined(JRNL_LITTLE_ENDIAN)
+ RHM_LENDIAN_FLAG;
+#elif defined(JRNL_BIG_ENDIAN)
+ RHM_BENDIAN_FLAG;
+#else
+#error Unknown endianness
+#endif
+ };
+
+} // namespace journal2
+} // namespace mrg
+
+#endif // ifndef mrg_journal2_Configuration_hpp
Modified: store/trunk/cpp/lib/jrnl2/DataToken.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/DataToken.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/DataToken.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
#define mrg_journal2_DataToken_hpp
#include <string>
-#include <sys/types.h> // u_int64_t
+#include <stdint.h> // uint64_t
#include "DataTokenState.hpp"
#include "ScopedLock.hpp"
@@ -43,7 +43,7 @@
namespace journal2
{
- typedef u_int64_t recordId_t;
+ typedef uint64_t recordId_t;
class AtomicRecordIdCounter
{
Modified: store/trunk/cpp/lib/jrnl2/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Journal.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Journal.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -48,7 +48,7 @@
}
// static
- u_int32_t Journal::_s_listSizeThreshold = 50;
+ uint32_t Journal::_s_listSizeThreshold = 50;
Journal::Journal(const std::string& jrnlId,
const std::string& jrnlDir,
@@ -119,7 +119,7 @@
return 0;
}
- u_int32_t
+ uint32_t
Journal::getWriteAioEventsRemaining() const
{
while (true) { // --- START OF CRITICAL SECTION ---
Modified: store/trunk/cpp/lib/jrnl2/Journal.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/Journal.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Journal.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
#define mrg_journal2_Journal_hpp
#include <string>
-#include <sys/types.h> // u_int64_t, u_int32_t, etc.
+#include <stdint.h> // uint64_t, uint32_t, etc.
#include <time.h> // timespec
#include "AioCallback.hpp"
@@ -44,6 +44,7 @@
// --- temp code ---
#include "ScopedLock.hpp" // ScopedMutex
#include <vector>
+#include "RecordHeader.hpp"
// --- end temp code ---
namespace mrg
@@ -52,7 +53,7 @@
{
// TODO - decide if this is the right place to expose these codes and flags
- typedef u_int64_t ioRes; // TODO - this needs to be expressed as flags
+ typedef uint64_t ioRes; // TODO - this needs to be expressed as flags
const ioRes RHM_IORES_ENQCAPTHRESH = 0x1;
const ioRes RHM_IORES_BUSY = 0x2;
std::string g_ioResAsString(const ioRes /*res*/);
@@ -68,7 +69,7 @@
AioCallback* _aioCallbackPtr;
// --- temp code ---
- static u_int32_t _s_listSizeThreshold;
+ static uint32_t _s_listSizeThreshold;
std::vector<DataToken*> _writeDataTokenList;
std::vector<DataToken*> _callBackDataTokenList[2];
bool _callBackDataTokenListSwitch;
@@ -97,7 +98,7 @@
// aio ops and status
// --- temp code ---
- u_int32_t getWriteAioEventsRemaining() const;
+ uint32_t getWriteAioEventsRemaining() const;
// --- end of temp code ---
void flush(const bool blockTillAioCompleteFlag);
void processCompletedAioWriteEvents(timespec* const timeout);
Modified: store/trunk/cpp/lib/jrnl2/JournalErrors.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalErrors.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalErrors.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -36,23 +36,15 @@
namespace journal2
{
- std::map<u_int32_t, const char*> JournalErrors::_s_errorMap;
- std::map<u_int32_t, const char*>::iterator JournalErrors::_s_errorMapIterator;
+ std::map<uint32_t, const char*> JournalErrors::_s_errorMap;
+ std::map<uint32_t, const char*>::iterator JournalErrors::_s_errorMapIterator;
bool JournalErrors::_s_initializedFlag = JournalErrors::_s_initialize();
- // generic errors
- const u_int32_t JournalErrors::JERR_PTHREAD = 0x0001;
-
- // illegal states
- const u_int32_t JournalErrors::JERR_BADJRNLSTATE = 0x0101;
- const u_int32_t JournalErrors::JERR_BADDTOKOPSTATE = 0x0102;
- const u_int32_t JournalErrors::JERR_BADDTOKTXNSTATE = 0x0103;
- const u_int32_t JournalErrors::JERR_BADDTOKIOSTATE = 0x0104;
-
bool
JournalErrors::_s_initialize()
{
_s_errorMap[JERR_PTHREAD] = "JERR_PTHREAD: pthread operation failure";
+ _s_errorMap[JERR_RTCLOCK] = "JERR_RTCLOCK: realtime clock operation failure";
_s_errorMap[JERR_BADJRNLSTATE] = "JERR_BADJRNLSTATE: Illegal journal state";
_s_errorMap[JERR_BADDTOKOPSTATE] = "JERR_BADDTOKOPSTATE: Illegal data token op state";
_s_errorMap[JERR_BADDTOKTXNSTATE] = "JERR_BADDTOKTXNSTATE: Illegal data token txn state";
@@ -61,7 +53,7 @@
}
const char*
- JournalErrors::s_errorMessage(const u_int32_t err_no) throw ()
+ JournalErrors::s_errorMessage(const uint32_t err_no) throw ()
{
_s_errorMapIterator = _s_errorMap.find(err_no);
if (_s_errorMapIterator == _s_errorMap.end())
Modified: store/trunk/cpp/lib/jrnl2/JournalErrors.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalErrors.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalErrors.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,7 +33,7 @@
#define mrg_journal2_JournalErrors_hpp
#include <map>
-#include <sys/types.h> // u_int32_t
+#include <stdint.h> // uint32_t
namespace mrg
{
@@ -42,22 +42,23 @@
class JournalErrors
{
- static std::map<u_int32_t, const char*> _s_errorMap; ///< Map of error messages
- static std::map<u_int32_t, const char*>::iterator _s_errorMapIterator; ///< Iterator
- static bool _s_initializedFlag; ///< Dummy flag, used to initialize map.
- static bool _s_initialize(); ///< Static fn for initializing static data
+ static std::map<uint32_t, const char*> _s_errorMap; ///< Map of error messages
+ static std::map<uint32_t, const char*>::iterator _s_errorMapIterator; ///< Iterator
+ static bool _s_initializedFlag; ///< Dummy flag, used to initialize map.
+ static bool _s_initialize(); ///< Static fn for initializing static data
public:
// generic errors
- static const u_int32_t JERR_PTHREAD; ///< pthread operation failure
+ static const uint32_t JERR_PTHREAD = 0x0001; ///< pthread operation failure
+ static const uint32_t JERR_RTCLOCK = 0x0002; ///< realtime clock operation failure
// illegal states
- static const u_int32_t JERR_BADJRNLSTATE; ///< Illegal journal state
- static const u_int32_t JERR_BADDTOKOPSTATE; ///< Illegal data token op state
- static const u_int32_t JERR_BADDTOKTXNSTATE; ///< Illegal data token txn state
- static const u_int32_t JERR_BADDTOKIOSTATE; ///< Illegal data token io state
+ static const uint32_t JERR_BADJRNLSTATE = 0x0101; ///< Illegal journal state
+ static const uint32_t JERR_BADDTOKOPSTATE = 0x0102; ///< Illegal data token op state
+ static const uint32_t JERR_BADDTOKTXNSTATE = 0x0103; ///< Illegal data token txn state
+ static const uint32_t JERR_BADDTOKIOSTATE = 0x0104; ///< Illegal data token io state
- static const char* s_errorMessage(const u_int32_t err_no) throw ();
+ static const char* s_errorMessage(const uint32_t err_no) throw ();
};
} // namespace journal2
Modified: store/trunk/cpp/lib/jrnl2/JournalException.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalException.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalException.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -45,7 +45,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode) throw ():
+ JournalException::JournalException(const uint32_t errorCode) throw ():
std::exception(),
_errorCode(errorCode)
{
@@ -68,7 +68,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const char* additionalInfo) throw ():
std::exception(),
_errorCode(errorCode),
@@ -77,7 +77,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const std::string& additionalInfo) throw ():
std::exception(),
_errorCode(errorCode),
@@ -86,7 +86,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const char* throwingClass,
const char* throwingFunction) throw ():
std::exception(),
@@ -97,7 +97,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const std::string& throwingClass,
const std::string& throwingFunction) throw ():
std::exception(),
@@ -108,7 +108,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const char* additionalInfo,
const char* throwingClass,
const char* throwingFunction) throw ():
@@ -121,7 +121,7 @@
_formatWhatStr();
}
- JournalException::JournalException(const u_int32_t errorCode,
+ JournalException::JournalException(const uint32_t errorCode,
const std::string& additionalInfo,
const std::string& throwingClass,
const std::string& throwingFunction) throw ():
Modified: store/trunk/cpp/lib/jrnl2/JournalException.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalException.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalException.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -35,7 +35,7 @@
#include <cstring> // std::strerror
#include <sstream>
#include <string>
-#include <sys/types.h> // u_int32_t
+#include <stdint.h> // uint32_t
#include "JournalErrors.hpp"
@@ -63,7 +63,7 @@
class JournalException : public std::exception
{
protected:
- u_int32_t _errorCode;
+ uint32_t _errorCode;
std::string _additionalInfo;
std::string _throwingClass;
std::string _throwingFunction;
@@ -74,28 +74,28 @@
public:
JournalException() throw ();
- JournalException(const u_int32_t errorCode) throw ();
+ JournalException(const uint32_t errorCode) throw ();
JournalException(const char* additionalInfo) throw ();
JournalException(const std::string& additionalInfo) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const char* additionalInfo) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const std::string& additionalInfo) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const char* throwingClass,
const char* throwingFunction) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const std::string& throwingClass,
const std::string& throwingFunction) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const char* additionalInfo,
const char* throwingClass,
const char* throwingFunction) throw ();
- JournalException(const u_int32_t errorCode,
+ JournalException(const uint32_t errorCode,
const std::string& additionalInfo,
const std::string& throwingClass,
const std::string& throwingFunction) throw ();
@@ -103,7 +103,7 @@
virtual ~JournalException() throw () {}
const char* what() const throw (); // override std::exception::what()
- inline u_int32_t getErrorCode() const throw () { return _errorCode; }
+ inline uint32_t getErrorCode() const throw () { return _errorCode; }
inline const std::string getAdditionalInfo() const throw () { return _additionalInfo; }
inline const std::string getThrowingClass() const throw () { return _throwingClass; }
inline const std::string getThrowingFunction() const throw () { return _throwingFunction; }
Modified: store/trunk/cpp/lib/jrnl2/JournalParameters.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalParameters.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalParameters.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,9 +1,9 @@
/**
- * \file JournalParameters.cpp
+ * \file jrnl2/JournalParameters.cpp
*
* Qpid asynchronous store plugin library
*
- * This file contains performance test code for the journal.
+ * This file contains async journal code (v.2).
*
* \author Kim van der Riet
*
@@ -41,12 +41,12 @@
// static declarations
std::string JournalParameters::_s_defaultJrnlDir = "/tmp/store";
std::string JournalParameters::_s_defaultJrnlBaseFileName = "JournalData";
- u_int16_t JournalParameters::_s_defaultNumJrnlFiles = 8;
- u_int32_t JournalParameters::_s_defaultJrnlFileSize_sblks = 3072;
+ uint16_t JournalParameters::_s_defaultNumJrnlFiles = 8;
+ uint32_t JournalParameters::_s_defaultJrnlFileSize_sblks = 3072;
bool JournalParameters::_s_defaultAutoExpand = false;
- u_int16_t JournalParameters::_s_defaultAutoExpandMaxJrnlFiles = 0;
- u_int16_t JournalParameters::_s_defaultWriteBuffNumPgs = 32;
- u_int32_t JournalParameters::_s_defaultWriteBuffPgSize_sblks = 128;
+ uint16_t JournalParameters::_s_defaultAutoExpandMaxJrnlFiles = 0;
+ uint16_t JournalParameters::_s_defaultWriteBuffNumPgs = 32;
+ uint32_t JournalParameters::_s_defaultWriteBuffPgSize_sblks = 128;
JournalParameters::JournalParameters() :
_jrnlDir(_s_defaultJrnlDir),
@@ -61,12 +61,12 @@
JournalParameters::JournalParameters(const std::string& jrnlDir,
const std::string& jrnlBaseFileName,
- const u_int16_t numJrnlFiles,
+ const uint16_t numJrnlFiles,
const bool autoExpand,
- const u_int16_t autoExpandMaxJrnlFiles,
- const u_int32_t jrnlFileSize_sblks,
- const u_int16_t writeBuffNumPgs,
- const u_int32_t writeBuffPgSize_sblks) :
+ const uint16_t autoExpandMaxJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks) :
_jrnlDir(jrnlDir),
_jrnlBaseFileName(jrnlBaseFileName),
_numJrnlFiles(numJrnlFiles),
Modified: store/trunk/cpp/lib/jrnl2/JournalParameters.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/JournalParameters.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/JournalParameters.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,9 +1,9 @@
/**
- * \file JournalParameters.hpp
+ * \file jrnl2/JournalParameters.hpp
*
* Qpid asynchronous store plugin library
*
- * This file contains performance test code for the journal.
+ * This file contains async journal code (v.2).
*
* \author Kim van der Riet
*
@@ -34,7 +34,7 @@
#include <iostream>
#include <string>
-#include <sys/types.h> // u_int16_t, u_int32_t
+#include <stdint.h> // uint16_t, uint32_t
namespace mrg
{
@@ -46,31 +46,31 @@
// static default store params
static std::string _s_defaultJrnlDir;
static std::string _s_defaultJrnlBaseFileName;
- static u_int16_t _s_defaultNumJrnlFiles;
- static u_int32_t _s_defaultJrnlFileSize_sblks;
+ static uint16_t _s_defaultNumJrnlFiles;
+ static uint32_t _s_defaultJrnlFileSize_sblks;
static bool _s_defaultAutoExpand;
- static u_int16_t _s_defaultAutoExpandMaxJrnlFiles;
- static u_int16_t _s_defaultWriteBuffNumPgs;
- static u_int32_t _s_defaultWriteBuffPgSize_sblks;
+ static uint16_t _s_defaultAutoExpandMaxJrnlFiles;
+ static uint16_t _s_defaultWriteBuffNumPgs;
+ static uint32_t _s_defaultWriteBuffPgSize_sblks;
std::string _jrnlDir;
std::string _jrnlBaseFileName;
- u_int16_t _numJrnlFiles;
- u_int32_t _jrnlFileSize_sblks;
+ uint16_t _numJrnlFiles;
+ uint32_t _jrnlFileSize_sblks;
bool _autoExpand;
- u_int16_t _autoExpandMaxJrnlFiles;
- u_int16_t _writeBuffNumPgs;
- u_int32_t _writeBuffPgSize_sblks;
+ uint16_t _autoExpandMaxJrnlFiles;
+ uint16_t _writeBuffNumPgs;
+ uint32_t _writeBuffPgSize_sblks;
JournalParameters();
JournalParameters(const std::string& jrnlDir,
const std::string& jrnlBaseFileName,
- const u_int16_t numJrnlFiles,
+ const uint16_t numJrnlFiles,
const bool autoExpand,
- const u_int16_t autoExpandMaxJrnlFiles,
- const u_int32_t jrnlFileSize_sblks,
- const u_int16_t writeBuffNumPgs,
- const u_int32_t writeBuffPgSize_sblks);
+ const uint16_t autoExpandMaxJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks);
JournalParameters(const JournalParameters& sp);
void toStream(std::ostream& os = std::cout) const;
std::string toString() const;
Modified: store/trunk/cpp/lib/jrnl2/Makefile.am
===================================================================
--- store/trunk/cpp/lib/jrnl2/Makefile.am 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/Makefile.am 2011-01-14 19:52:45 UTC (rev 4438)
@@ -33,6 +33,7 @@
JournalParameters.cpp \
JournalState.cpp \
AioCallback.hpp \
+ Configuration.hpp \
DataToken.hpp \
DataTokenState.hpp \
Journal.hpp \
@@ -41,5 +42,6 @@
JournalException.hpp \
JournalParameters.hpp \
JournalState.hpp \
+ RecordHeader.hpp \
ScopedLock.hpp
Modified: store/trunk/cpp/lib/jrnl2/README
===================================================================
--- store/trunk/cpp/lib/jrnl2/README 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/lib/jrnl2/README 2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,3 +1,21 @@
This directory contains experimental code. It is not (currently)
a part of the main store, and can be safely ignored for all
normal builds.
+
+This is a refactorization of the lib/jrnl directory, with completely
+renamed classes according to normal C++ naming standards used elsewhere
+in this project.
+
+Also included are the following enhancements:
+* A new record type for journal events. This will allow the redelivered
+ flag to be set correctly (amongst other things)
+* A new record tail layout which contains a checksum. This will solve the
+ issue of losing data unknowingly if the first and last pages of a
+ multi-page write occur, but some in-between pages are still pending at
+ time of failure.
+* Some minor refactoring of the FileHeader type
+* A clean-up with a proper heirarchy of these classes
+
+NOTE: these will break binary compatibility with earlier journals - an
+upgrade issue. This journal will need to have its revision number incremented
+from 1 to 2.
Added: store/trunk/cpp/lib/jrnl2/RecordHeader.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl2/RecordHeader.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl2/RecordHeader.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -0,0 +1,1333 @@
+/**
+ * \file RecordHeader.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * This file contains async journal code (v.2).
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010, 2011 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ *
+ * List of structs in this file:
+ * struct DequeueHeader
+ * struct EnqueueHeader
+ * struct EventHeader
+ * struct FileHeader
+ * struct RecordHeader
+ * struct RecordTail
+ * struct TransactionHeader
+ *
+ * Overview of structs in this file:
+ *
+ * <pre>
+ * +------------+ +--------------+
+ * | RecordTail | | RecordHeader |
+ * +------------+ | (abstract) |
+ * +--------------+
+ * ^
+ * |
+ * +----------------+-----------------+-------------------+
+ * | | | |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * | FileHeader | | EventHeader | | DequeueHeader | | TransactionHeader |
+ * +------------+ +-------------+ +---------------+ +-------------------+
+ * ^
+ * |
+ * +---------------+
+ * | EnqueueHeader |
+ * +---------------+
+ * </pre>
+ */
+
+#ifndef mrg_journal2_RecordHeader_hpp
+#define mrg_journal2_RecordHeader_hpp
+
+#include <cstddef> // std::size_t
+#include <ctime> // std::time_t
+#include "Configuration.hpp"
+#include "JournalException.hpp"
+#include <stdint.h> // uint8_t, uint16_t, uint32_t, uint64_t
+
+namespace mrg
+{
+namespace journal2
+{
+
+#pragma pack(1)
+
+ /**
+ * \brief Struct for data common to the head of all journal files and records.
+ * This includes identification for the file type, the encoding version, endian
+ * indicator and a record ID.
+ *
+ * File layout in binary format (16 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x00 | _magic | v | e | _flags |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x08 | _recordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct RecordHeader
+ {
+ uint32_t _magic; ///< File type identifier (magic number)
+ uint8_t _version; ///< File encoding version
+ uint8_t _bigEndianFlag; ///< Flag for determining endianness
+ uint16_t _flags; ///< User and system flags
+ uint64_t _recordId; ///< Record ID (rotating 64-bit counter)
+
+ static const uint16_t HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline RecordHeader() :
+ _magic(0),
+ _version(0),
+ _bigEndianFlag(0),
+ _flags(0),
+ _recordId(0)
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic Magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param overwriteIndicator Overwrite indicator for this record
+ */
+ inline RecordHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator) :
+ _magic(magic),
+ _version(version),
+ _bigEndianFlag(Configuration::_s_endianValue),
+ _flags(overwriteIndicator ? HDR_OVERWRITE_INDICATOR_MASK : 0),
+ _recordId(recordId)
+ {}
+
+ /**
+ * \brief Copy constructor.
+ */
+ inline RecordHeader(const RecordHeader& rh) :
+ _magic(rh._magic),
+ _version(rh._version),
+ _bigEndianFlag(rh._bigEndianFlag),
+ _flags(rh._flags),
+ _recordId(rh._recordId)
+ {}
+
+ /**
+ * \brief Destructor.
+ */
+ virtual inline ~RecordHeader()
+ {}
+
+ /**
+ * \brief Convenience copy method.
+ */
+ virtual inline void copy(const RecordHeader& rh)
+ {
+ _magic = rh._magic;
+ _version = rh._version;
+ _bigEndianFlag = rh._bigEndianFlag;
+ _flags = rh._flags;
+ _recordId = rh._recordId;
+ }
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ virtual inline void reset()
+ {
+ _magic = 0;
+ _version = 0;
+ _bigEndianFlag = 0;
+ _flags = 0;
+ _recordId = 0;
+ }
+
+ /**
+ * \brief Return the value of the Overwrite Indicator for this record.
+ *
+ * \return true if the Overwrite Indicator flag is set, false otherwise.
+ */
+ inline bool getOverwriteIndicator() const {
+ return _flags & HDR_OVERWRITE_INDICATOR_MASK;
+ }
+
+ /**
+ * \brief Set the value of the Overwrite Indicator for this record
+ */
+ inline void setOverwriteIndicator(const bool owi) {
+ _flags = owi ?
+ _flags | HDR_OVERWRITE_INDICATOR_MASK :
+ _flags & (~HDR_OVERWRITE_INDICATOR_MASK);
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes. Must be implemented by
+ * subclasses.
+ *
+ * \return Size of record header in bytes.
+ */
+ static inline std::size_t getHeaderSize() {
+ return sizeof(RecordHeader);
+ }
+
+ /**
+ * \brief Return the body (data) size of this record in bytes. Must be implemented
+ * by subclasses.
+ *
+ * \return Size of record body in bytes.
+ */
+ virtual std::size_t getBodySize() const = 0;
+
+ /**
+ * \brief Return total size of this record in bytes, being the sum of the header,
+ * xid (if present), data (if present) and tail (if present). Must be implemented
+ * by subclasses.
+ *
+ * \returns The size of the entire record, including header, body (xid and data,
+ * if present) and record tail (if persent) in bytes.
+ */
+ virtual std::size_t getRecordSize() const = 0;
+
+ // TODO - Is this the right place for encode/decode fns?
+ ///**
+ // * \brief Encode (write) this record instance into the buffer pointed to by the buffer
+ // * pointer. Must be implemented by subclasses.
+ // */
+ //virtual std::size_t encode(char* bufferPtr,
+ // const std::size_t bufferSize,
+ // const std::size_t encodeOffset = 0) = 0;
+
+ /**
+ * \brief Return a uint32_t checksum for the header and body content of this record.
+ *
+ * \param initialValue The initial (or seed) value of the checksum.
+ *
+ * \return Checksum for header and body of record. Tail (if any) is excluded.
+ */
+ inline uint32_t getCheckSum(uint32_t initialValue = 0) const {
+ uint32_t cs = initialValue;
+ for (unsigned char* p = (unsigned char*)this;
+ p < (unsigned char*)this + getHeaderSize() + getBodySize();
+ p++) {
+ cs ^= (uint32_t)(*p);
+ bool carry = cs & uint32_t(0x80000000);
+ cs <<= 1;
+ if (carry) cs++;
+ }
+ return cs;
+ }
+ }; // struct RecordHeader
+
+
+
+
+ /**
+ * \brief Struct for data common to the tail of all records. The magic number
+ * used here is the binary inverse (1's complement) of the magic used in the
+ * record header; this minimizes possible confusion with other headers that may
+ * be present during recovery. The tail is used with all records that have either
+ * XIDs or data - ie any size-variable content. Currently the only records that
+ * do NOT use the tail are non-transactional dequeues and filler records.
+ *
+ * Record layout in binary format (16 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x00 | _xMagic | _checkSum |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x08 | _recordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ */
+ struct RecordTail
+ {
+ uint32_t _xMagic; ///< Binary inverse (1's complement) of hdr magic number
+ uint32_t _checkSum; ///< Checksum for header and body of record
+ uint64_t _recordId; ///< Record identifier matching that of the header for this record
+
+
+ /**
+ * \brief Default constructor, which sets most values to 0.
+ */
+ inline RecordTail() :
+ _xMagic(0xffffffff),
+ _checkSum(0),
+ _recordId(0)
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param xMagic The inverse of the record header magic (ie ~rh._magic).
+ * \param checkSum The checksum for this record header and body.
+ * \param recordId The record identifier matching the record header.
+ */
+ inline RecordTail(const uint32_t xMagic,
+ const uint32_t checkSum,
+ const uint64_t recordId) :
+ _xMagic(xMagic),
+ _checkSum(checkSum),
+ _recordId(recordId)
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction from
+ * existing RecordHeader instance.
+ *
+ * \param rh Header instance for which the RecordTail is to be created.
+ */
+ inline RecordTail(const RecordHeader& rh) :
+ _xMagic(~rh._magic),
+ _checkSum(rh.getCheckSum()),
+ _recordId(rh._recordId)
+ {}
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param rt Instance to be copied.
+ */
+ inline RecordTail(const RecordTail& rt) :
+ _xMagic(rt._xMagic),
+ _checkSum(rt._checkSum),
+ _recordId(rt._recordId)
+ {}
+
+ /**
+ * \brief Destructor.
+ */
+ virtual inline ~RecordTail()
+ {}
+
+ /**
+ * \brief Convenience copy method.
+ *
+ * \param rt Instance to be copied.
+ */
+ inline void copy(const RecordTail& rt) {
+ _xMagic = rt._xMagic;
+ _checkSum = rt._checkSum;
+ _recordId = rt._recordId;
+ }
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ inline void reset() {
+ _xMagic = 0xffffffff;
+ _checkSum = 0;
+ _recordId = 0;
+ }
+
+ /**
+ * \brief Returns the size of the header in bytes.
+ */
+ inline static std::size_t getSize() {
+ return sizeof(RecordTail);
+ }
+ }; // struct RecordTail
+
+
+
+
+ /**
+ * \brief Struct for data common to the head of all journal files. In addition to
+ * the common data, this includes the record ID and offset of the first record in
+ * the file.
+ *
+ * This header precedes all data in journal files and occupies the first complete
+ * block in the file. The record ID and offset are updated on each overwrite of the
+ * file.
+ *
+ * File layout in binary format (48 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId (used to show first rid in file) | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _physicalFileId | _logicalFileId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | _firstRecordOffset |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x20 | _timestampSeconds |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x28 | _timestampNanoSeconds | _reserved |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct FileHeader : public RecordHeader
+ {
+ uint32_t _physicalFileId; ///< Physical file ID (pfid)
+ uint32_t _logicalFileId; ///< Logical file ID (lfid)
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Big-endian filler for 32-bit size_t
+#endif
+ std::size_t _firstRecordOffset; ///< First record offset
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Little-endian filler for 32-bit size_t
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler1; ///< Big-endian filler for 32-bit time_t
+#endif
+ std::time_t _timestampSeconds; ///< Timestamp of journal initialization, seconds component
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler1; ///< Little-endian filler for 32-bit time_t
+#endif
+ uint32_t _timestampNanoSeconds; ///< Timestamp of journal initialization, nanoseconds component
+ uint32_t _reserved; ///< Little-endian filler for uint32_t
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline FileHeader() :
+ RecordHeader(),
+ _physicalFileId(0),
+ _logicalFileId(0),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _firstRecordOffset(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _timestampSeconds(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _timestampNanoSeconds(0),
+ _reserved(0)
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic Magic for this record
+ * \param version Version of this record
+ * \param recordId RecordId for this record
+ * \param overwriteIndicator Overwrite indicator for this record
+ * \param physicalFileId Physical file ID (file number on disk)
+ * \param logicalFileId Logical file ID (file number as seen by circular file buffer)
+ * \param firstRecordOffset First record offset in bytes from beginning of file
+ * \param setTimestampFlag If true, causes the timestamp to be initialized with the current system time
+ */
+ inline FileHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const bool overwriteIndicator,
+ const uint16_t physicalFileId,
+ const uint16_t logicalFileId,
+ const std::size_t firstRecordOffset,
+ const bool setTimestampFlag = false) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ _physicalFileId(physicalFileId),
+ _logicalFileId(logicalFileId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _firstRecordOffset(firstRecordOffset),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _timestampSeconds(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _timestampNanoSeconds(0),
+ _reserved(0)
+ {
+ if (setTimestampFlag) setTimestamp();
+ }
+
+ /**
+ * \brief Copy constructor.
+ *
+ * \param fh FileHeader instance to be copied
+ */
+ inline FileHeader(const FileHeader& fh) :
+ RecordHeader(fh),
+ _physicalFileId(fh._physicalFileId),
+ _logicalFileId(fh._logicalFileId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(fh._filler0),
+#endif
+ _firstRecordOffset(fh._firstRecordOffset),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(fh._filler0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(fh._filler1),
+#endif
+ _timestampSeconds(fh._timestampSeconds),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(fh._filler1),
+#endif
+ _timestampNanoSeconds(fh._timestampNanoSeconds),
+ _reserved(fh._reserved)
+ {}
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~FileHeader()
+ {}
+
+ /**
+ * \brief Convenience copy method.
+ *
+ * \param fh FileHeader instance to be copied
+ */
+ inline void copy(const FileHeader& fh) {
+ RecordHeader::copy(fh);
+ _physicalFileId = fh._physicalFileId;
+ _logicalFileId = fh._logicalFileId;
+ _firstRecordOffset = fh._firstRecordOffset;
+ _timestampSeconds = fh._timestampSeconds;
+ _timestampNanoSeconds = fh._timestampNanoSeconds;
+ _reserved = fh._reserved;
+#if defined(JRNL_32_BIT)
+ _filler0 = fh._filler0;
+ _filler1 = fh._filler1;
+#endif
+ }
+
+ /**
+ * \brief Resets all fields to default values (mostly 0).
+ */
+ inline void reset() {
+ RecordHeader::reset();
+ _physicalFileId = 0;
+ _logicalFileId = 0;
+ _firstRecordOffset = 0;
+ _timestampSeconds = 0;
+ _timestampNanoSeconds = 0;
+ _reserved = 0;
+#if defined(JRNL_32_BIT)
+ _filler0 = 0;
+ _filler1 = 0;
+#endif
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \return Size of record header in bytes.
+ */
+ static inline std::size_t getHeaderSize() {
+ return sizeof(FileHeader);
+ }
+
+ /**
+ * \brief Return the body (data) size of this record in bytes.
+ *
+ * \return Size of record body in bytes. By definition, a FileHeader has no body.
+ */
+ inline std::size_t getBodySize() const {
+ return 0;
+ }
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the
+ * FileHeader the size of the header itself only.
+ */
+ inline std::size_t getRecordSize() const {
+ return getHeaderSize();
+ }
+
+ /**
+ * \brief Gets the current time from the system clock and sets the timestamp in the struct.
+ */
+ inline void setTimestamp()
+ {
+ // TODO: Standardize on method for getting time that does not require a context switch.
+ timespec ts;
+ if (::clock_gettime(CLOCK_REALTIME, &ts))
+ {
+ std::ostringstream oss;
+ oss << FORMAT_SYSERR(errno);
+ throw JournalException(JournalErrors::JERR_RTCLOCK, oss.str(), "FileHeader", "setTimestamp");
+ }
+ setTimestamp(ts);
+ }
+
+ /**
+ * \brief Sets the timestamp in the struct to the provided value (in seconds and nanoseconds).
+ *
+ * \param ts Timestamp from which the file header time stamp is to be copied
+ */
+ inline void setTimestamp(const timespec& ts)
+ {
+ _timestampSeconds = ts.tv_sec;
+ _timestampNanoSeconds = ts.tv_nsec;
+ }
+ }; // struct FileHeader
+
+
+
+
+ /**
+ * \brief Struct for event records, which can be used to record system events in the
+ * store.
+ *
+ * The EventHeader record type may be used to store events into the journal which do
+ * not constitute data content but changes of state in the broker. These can be
+ * recovered and used to set appropriate state in the broker.
+ *
+ * This record is almost identical to the EnqueueRecord, but without the flags. I
+ * am uncertain at this time whether it is necessary to set an XID on an event
+ * record, but in case, I have left this feature in. In any event, there is only a
+ * 1 byte size penalty in the header size for doing so.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | _dataSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct EventHeader : public RecordHeader
+ {
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Big-endian filler for 32-bit size_t
+#endif
+ std::size_t _xidSize; ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Little-endian filler for 32-bit size_t
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler1; ///< Big-endian filler for 32-bit size_t
+#endif
+ std::size_t _dataSize; ///< Record data size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler1; ///< Little-endian filler for 32-bit size_t
+#endif
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline EventHeader() :
+ RecordHeader(),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(0),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _dataSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler1(0)
+#endif
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param dataSize Size of the opaque data block for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ */
+ inline EventHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const std::size_t xidSize,
+ const std::size_t dataSize,
+ const bool overwriteIndicator) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(xidSize),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(0),
+#endif
+ _dataSize(dataSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler1(0)
+#endif
+ {}
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param eh Instance to be copied
+ */
+ inline EventHeader(const EventHeader& eh) :
+ RecordHeader(eh),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(eh._filler0),
+#endif
+ _xidSize(eh._xidSize),
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(eh._filler0),
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler1(eh._filler1),
+#endif
+ _dataSize(eh._dataSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler1(eh._filler1)
+#endif
+ {}
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~EventHeader() {}
+
+ /**
+ * \brief Convenience copy method.
+ */
+ virtual inline void copy(const EventHeader& e) {
+ RecordHeader::copy(e);
+ _xidSize = e._xidSize;
+ _dataSize = e._dataSize;
+#if defined(JRNL_32_BIT)
+ _filler0 = e._filler0;
+ _filler1 = e._filler1;
+#endif
+ }
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ virtual inline void reset() {
+ RecordHeader::reset();
+ _xidSize = 0;
+ _dataSize = 0;
+#if defined(JRNL_32_BIT)
+ _filler0 = 0;
+ _filler1 = 0;
+#endif
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \return Size of record header in bytes.
+ */
+ static inline std::size_t getHeaderSize() {
+ return sizeof(EventHeader);
+ }
+
+ /**
+ * \brief Return the body (data) size of this record in bytes.
+ *
+ * \return Size of record body in bytes.
+ */
+ virtual inline std::size_t getBodySize() const {
+ return _xidSize + _dataSize;
+ }
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the
+ * enqueue record the size of the header, the size of the body (xid and data)
+ * and the size of the tail.
+ */
+ virtual inline std::size_t getRecordSize() const {
+ return getHeaderSize() + (getBodySize() ?
+ getBodySize() + RecordTail::getSize() :
+ 0);
+ }
+ }; // struct EventHeader
+
+
+
+
+ /**
+ * \brief Struct for enqueue record.
+ *
+ * Struct for enqueue record. In addition to the common data, this header includes both the
+ * xid and data blob sizes.
+ *
+ * This header precedes all enqueue data in journal files.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | _dataSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct EnqueueHeader : public EventHeader
+ {
+ static const uint16_t ENQ_HDR_TRANSIENT_MASK = 0x10;
+ static const uint16_t ENQ_HDR_EXTERNAL_MASK = 0x20;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline EnqueueHeader() :
+ EventHeader()
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param dataSize Size of the opaque data block for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ * \param transient Flag indicating that this record is transient (ie to be discarded on recovery)
+ * \param external Flag indicating that this record's data is stored externally to the journal, the data portion
+ * of the record identifies the storage location.
+ */
+ inline EnqueueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const std::size_t xidSize,
+ const std::size_t dataSize,
+ const bool overwriteIndicator,
+ const bool transient = false,
+ const bool external = false) :
+ EventHeader(magic, version, recordId, xidSize, dataSize, overwriteIndicator)
+ {
+ setTransientFlag(transient);
+ setExternalFlag(external);
+ }
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param eh Instance to be copied
+ */
+ inline EnqueueHeader(const EnqueueHeader& eh) :
+ EventHeader(eh)
+ {}
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~EnqueueHeader() {}
+
+ /**
+ * \brief Return the value of the Transient flag for this record.
+ * If set, this record is ignored during recovery.
+ *
+ * \return true if the Transient flag for this record is set, false otherwise.
+ */
+ inline bool getTransientFlag() const {
+ return _flags & ENQ_HDR_TRANSIENT_MASK;
+ }
+
+ /**
+ * \brief Set the value of the Transient flag for this record.
+ *
+ * \param transient The value to be set in the transient flag.
+ */
+ inline void setTransientFlag(const bool transient = true) {
+ _flags = transient ?
+ _flags | ENQ_HDR_TRANSIENT_MASK :
+ _flags & (~ENQ_HDR_TRANSIENT_MASK);
+ }
+
+ /**
+ * \brief Return the value of the External flag for this record. If set, this record data is not within the
+ * journal but external to it. The data part of this record contains the location of the stored data.
+ *
+ * \return true if the Transient flag for this record is set, false otherwise.
+ */
+ inline bool getExternalFlag() const {
+ return _flags & ENQ_HDR_EXTERNAL_MASK;
+ }
+
+ /**
+ * \brief Set the value of the External flag for this record.
+ *
+ * \param external The value to be set in the External flag.
+ */
+ inline void setExternalFlag(const bool external = true) {
+ _flags = external ?
+ _flags | ENQ_HDR_EXTERNAL_MASK :
+ _flags & (~ENQ_HDR_EXTERNAL_MASK);
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \return Size of record header in bytes.
+ */
+ static inline std::size_t getHeaderSize() {
+ return sizeof(EnqueueHeader);
+ }
+ }; // struct EnqueueHeader
+
+
+
+
+ /**
+ * \brief Struct for dequeue record.
+ *
+ * Struct for dequeue record. If this record has a non-zero xidsize field (i.e., there is a
+ * valid XID), then this header is followed by the XID of xidsize bytes and a rec_tail. If,
+ * on the other hand, this record has a zero xidsize (i.e., there is no XID), then the rec_tail
+ * is absent.
+ *
+ * Note that this record had its own rid distinct from the rid of the record it is dequeuing.
+ * The rid field below is the rid of the dequeue record itself; the deq-rid field is the rid of a
+ * previous enqueue record being dequeued by this record.
+ *
+ * Record layout in binary format (32 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _dequeuedRecordId |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * 0x18 | _xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct DequeueHeader : public RecordHeader
+ {
+ uint64_t _dequeuedRecordId; ///< Record ID of dequeued record
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Big-endian filler for 32-bit size_t
+#endif
+ std::size_t _xidSize; ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Little-endian filler for 32-bit size_t
+#endif
+
+ static const uint16_t DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK = 0x10;
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ inline DequeueHeader() :
+ RecordHeader(),
+ _dequeuedRecordId(0),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(0)
+#endif
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param dequeuedRecordId Record identifier of the record being dequeued by this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ * \param tplCommitOnTxnComplFlag
+ */
+ inline DequeueHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const uint64_t dequeuedRecordId,
+ const std::size_t xidSize,
+ const bool overwriteIndicator,
+ const bool tplCommitOnTxnComplFlag = false) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+ _dequeuedRecordId(dequeuedRecordId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(0)
+#endif
+ {
+ setTplCommitOnTxnComplFlag(tplCommitOnTxnComplFlag);
+ }
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param dh Instance to be copied
+ */
+ inline DequeueHeader(const DequeueHeader& dh) :
+ RecordHeader(dh),
+ _dequeuedRecordId(dh._dequeuedRecordId),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(dh._filler0),
+#endif
+ _xidSize(dh._xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(dh._filler0)
+#endif
+ {}
+
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~DequeueHeader() {}
+
+ /**
+ * \brief Convenience copy method.
+ */
+ inline void copy(const DequeueHeader& dh) {
+ RecordHeader::copy(dh);
+ _dequeuedRecordId = dh._dequeuedRecordId;
+ _xidSize = dh._xidSize;
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0 = dh._filler0;
+#endif
+ }
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ inline void reset() {
+ RecordHeader::reset();
+ _dequeuedRecordId = 0;
+ _xidSize = 0;
+#if defined(JRNL_32_BIT)
+ _filler0 = 0;
+#endif
+ }
+
+ /**
+ * \brief Return the value of the tplCommitOnTxnComplFlag for this record. This
+ * flag is used only within the TPL, and if set, indicates that the transaction was
+ * closed using a commit. If not set, the transaction was closed using an abort.
+ * This is used during recovery of the transactions in the store.
+ *
+ * \return true if the tplCommitOnTxnComplFlag flag for this record is set, false
+ * otherwise.
+ */
+ inline bool getTplCommitOnTxnComplFlag() const {
+ return _flags & DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK;
+ }
+
+ /**
+ * \brief Set the value of the tplCommitOnTxnComplFlag for this record. This is
+ * only used in the TPL, and is ignored elsewhere.
+ *
+ * \param commitOnTxnCompl The value to be set in the tplCommitOnTxnComplFlag. If
+ * true, the transaction was closed with a commit; if false, with an abort.
+ */
+ inline void setTplCommitOnTxnComplFlag(const bool commitOnTxnCompl) {
+ _flags = commitOnTxnCompl ?
+ _flags | DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK :
+ _flags & (~DEQ_HDR_TPL_COMMIT_ON_TXN_COMPL_MASK);
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \return Size of record header in bytes.
+ */
+ static std::size_t getHeaderSize() {
+ return sizeof(DequeueHeader);
+ }
+
+ /**
+ * \brief Return the body (xid and data) size of this record in bytes.
+ *
+ * \return Size of record body in bytes.
+ */
+ std::size_t getBodySize() const {
+ return _xidSize;
+ }
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the
+ * dequeue record the size of the header, the size of the body (xid only) and
+ * the size of the tail.
+ */
+ inline std::size_t getRecordSize() const {
+ return getHeaderSize() + (getBodySize() ?
+ getBodySize() + RecordTail::getSize() :
+ 0);
+ }
+ }; // struct DequeueHeader
+
+
+
+
+ /**
+ * \brief Struct for transaction commit and abort records.
+ *
+ * Struct for DTX commit and abort records. Only the magic distinguishes between them. Since
+ * this record must be used in the context of a valid XID, the xidsize field must not be zero.
+ * Immediately following this record is the XID itself which is xidsize bytes long, followed by
+ * a rec_tail.
+ *
+ * Note that this record had its own rid distinct from the rids of the record(s) making up the
+ * transaction it is committing or aborting.
+ *
+ * Record layout in binary format (24 bytes):
+ * <pre>
+ * 0x0 0x7
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x00 | _magic | v | e | _flags | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ | struct RecordHeader
+ * 0x08 | _recordId | |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+ -+
+ * 0x10 | _xidSize |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * </pre>
+ * <table>
+ * <tr>
+ * <td>v</td>
+ * <td>file version [ <code>_version</code> ] (If the format or encoding of
+ * this file changes, then this number should be incremented)</td>
+ * </tr>
+ * <tr>
+ * <td>e</td>
+ * <td>endian flag [ <code>_bigEndianFlag</code> ], <b>false</b> (0x00) for
+ * little endian, <b>true</b> (0x01) for big endian</td>
+ * </tr>
+ * </table>
+ */
+ struct TransactionHeader : public RecordHeader
+ {
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Big-endian filler for 32-bit size_t
+#endif
+ std::size_t _xidSize; ///< XID size
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ uint32_t _filler0; ///< Little-endian filler for 32-bit size_t
+#endif
+
+ /**
+ * \brief Default constructor, which sets all values to 0.
+ */
+ TransactionHeader() :
+ RecordHeader(),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(0)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(0)
+#endif
+ {}
+
+ /**
+ * \brief Convenience constructor which initializes values during construction.
+ *
+ * \param magic The magic for this record
+ * \param version Version of this record
+ * \param recordId Record identifier for this record
+ * \param xidSize Size of the transaction (or distributed transaction) ID for this record
+ * \param overwriteIndicator Flag indicating the present value of the overwrite indicator when writing this
+ * record
+ */
+ TransactionHeader(const uint32_t magic,
+ const uint8_t version,
+ const uint64_t recordId,
+ const std::size_t xidSize,
+ const bool overwriteIndicator) :
+ RecordHeader(magic, version, recordId, overwriteIndicator),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(0),
+#endif
+ _xidSize(xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(0)
+#endif
+ {}
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param th Instance to be copied
+ */
+ TransactionHeader(const TransactionHeader& th) :
+ RecordHeader(th),
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ _filler0(th._filler0),
+#endif
+ _xidSize(th._xidSize)
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ , _filler0(th._filler0)
+#endif
+ {}
+
+
+ /**
+ * \brief Destructor.
+ */
+ virtual ~TransactionHeader()
+ {}
+
+ /**
+ * \brief Convenience copy method.
+ */
+ inline void copy(const TransactionHeader& th) {
+ RecordHeader::copy(th);
+ _xidSize = th._xidSize;
+#if defined(JRNL_32_BIT)
+ _filler0 = th._filler0;
+#endif
+ }
+
+ /**
+ * \brief Reset this record to default values (mostly 0)
+ */
+ inline void reset() {
+ RecordHeader::reset();
+ _xidSize = 0;
+#if defined(JRNL_32_BIT)
+ _filler0 = 0;
+#endif
+ }
+
+ /**
+ * \brief Return the header size of this record in bytes.
+ *
+ * \return Size of record header in bytes.
+ */
+ static std::size_t getHeaderSize() { return sizeof(TransactionHeader); }
+
+ /**
+ * \brief Return the body (data) size of this record in bytes.
+ *
+ * \return Size of record body in bytes.
+ */
+ std::size_t getBodySize() const { return _xidSize; }
+
+ /**
+ * \brief Return total size of this record in bytes, being in the case of the
+ * dequeue record the size of the header, the size of the body (xid only) and
+ * the size of the tail.
+ */
+ inline std::size_t getRecordSize() const {
+ // By definition, TransactionRecords must always have an xid, hence a record
+ // tail as well. No check on body size required in this case.
+ return getHeaderSize() + getBodySize() + RecordTail::getSize();
+ }
+ }; // struct TransactionHeader
+
+#pragma pack()
+
+} // namespace journal2
+} // namespace mrg
+
+#endif // ifndef mrg_journal2_RecordHeader_hpp
Modified: store/trunk/cpp/perf/JournalParameters.cpp
===================================================================
--- store/trunk/cpp/perf/JournalParameters.cpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/JournalParameters.cpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,5 +1,5 @@
/**
- * \file JournalParameters.cpp
+ * \file perf/JournalParameters.cpp
*
* Qpid asynchronous store plugin library
*
Modified: store/trunk/cpp/perf/JournalParameters.hpp
===================================================================
--- store/trunk/cpp/perf/JournalParameters.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/JournalParameters.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -1,5 +1,5 @@
/**
- * \file JournalParameters.hpp
+ * \file perf/JournalParameters.hpp
*
* Qpid asynchronous store plugin library
*
Modified: store/trunk/cpp/perf/ScopedTimer.hpp
===================================================================
--- store/trunk/cpp/perf/ScopedTimer.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/ScopedTimer.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -64,17 +64,17 @@
class ScopedTimer
{
double& _elapsed; ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances
- ::timespec _startTime; ///< Start time, set on construction
+ std::timespec _startTime; ///< Start time, set on construction
/**
* \brief Convert ::timespec to seconds
*
* Static function to convert a ::timespec struct into a double representation in seconds.
*
- * \param ts ::timespec struct containing the time to be converted.
+ * \param ts std::timespec struct containing the time to be converted.
* \return A double which represents the time in parameter ts in seconds.
*/
- static double _s_getDoubleTime(const ::timespec& ts);
+ static double _s_getDoubleTime(const std::timespec& ts);
public:
/**
Modified: store/trunk/cpp/perf/TestParameters.hpp
===================================================================
--- store/trunk/cpp/perf/TestParameters.hpp 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/TestParameters.hpp 2011-01-14 19:52:45 UTC (rev 4438)
@@ -79,7 +79,7 @@
* \param numQueues Number of queues to test simultaneously
* \param numThreadPairsPerQueue Number of thread pairs (enq and deq) per queue
* \param enqTxnBlockSize Transaction block size for enqueues
- * \parma deqTxnBlockSize Transaction block size for dequeues
+ * \param deqTxnBlockSize Transaction block size for dequeues
*/
TestParameters(const uint32_t numMsgs,
const uint32_t msgSize,
Modified: store/trunk/cpp/perf/m
===================================================================
--- store/trunk/cpp/perf/m 2010-12-17 11:06:36 UTC (rev 4437)
+++ store/trunk/cpp/perf/m 2011-01-14 19:52:45 UTC (rev 4438)
@@ -5,7 +5,7 @@
# The variable JOURNAL2, if defined, will link with the new journal2 namespace journal. Otherwise the old journal
# namespace will be used.
-#JOURNAL2=1
+JOURNAL2=1
# Optimization options
#OPT="-O0 -ggdb"
13 years, 11 months