[rhmessaging-commits] rhmessaging commits: r1869 - in store/trunk/cpp: tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Apr 8 15:36:31 EDT 2008
Author: tedross
Date: 2008-04-08 15:36:30 -0400 (Tue, 08 Apr 2008)
New Revision: 1869
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/SimpleTest.cpp
Log:
Use updated create interface from QPID to carry arguments for journal creation
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-04-08 19:36:30 UTC (rev 1869)
@@ -268,7 +268,7 @@
}
}
-void BdbMessageStore::create(PersistableQueue& queue)
+void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
@@ -276,16 +276,32 @@
}
if (usingJrnl()) {
JournalImpl* jQueue = 0;
+ FieldTable::ValuePtr value;
+
+ uint16_t localFileCount = numJrnlFiles;
+ uint32_t localFileSize = jrnlFsizePgs;
+
+ value = args.get ("qpid.file_count");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileCount = (uint16_t) value->get<int>();
+
+ value = args.get ("qpid.file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+ localFileSize = (uint32_t) value->get<int>();
+
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+ string("JournalData"), defJournalGetEventsTimeout,
+ defJournalFlushTimeout);
}
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE);
+ jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE);
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+ ": create() failed: " + e.what());
}
}
@@ -312,7 +328,7 @@
}
}
-void BdbMessageStore::create(const PersistableExchange& exchange)
+void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-04-08 19:36:30 UTC (rev 1869)
@@ -169,10 +169,12 @@
void truncate();
- void create(qpid::broker::PersistableQueue& queue);
+ void create(qpid::broker::PersistableQueue& queue,
+ const qpid::framing::FieldTable& args);
void destroy(qpid::broker::PersistableQueue& queue);
- void create(const qpid::broker::PersistableExchange& queue);
+ void create(const qpid::broker::PersistableExchange& queue,
+ const qpid::framing::FieldTable& args);
void destroy(const qpid::broker::PersistableExchange& queue);
void bind(const qpid::broker::PersistableExchange& exchange,
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-04-08 19:36:30 UTC (rev 1869)
@@ -33,6 +33,7 @@
#include <qpid/broker/Queue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
#define TESTDIR "/tmp"
@@ -86,7 +87,7 @@
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
- store.create(queue);
+ store.create(queue, qpid::framing::FieldTable());
// TODO - check dir exists
BOOST_REQUIRE(queue.getPersistenceId());
store.destroy(queue);
@@ -114,7 +115,7 @@
store.init(TESTDIR, async, true, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
- store.create(queue);
+ store.create(queue, qpid::framing::FieldTable());
BOOST_REQUIRE(queue.getPersistenceId());
id = queue.getPersistenceId();
}//db will be closed
@@ -166,7 +167,7 @@
store.init(TESTDIR, async, true, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
- store.create(queue);
+ store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
@@ -407,7 +408,7 @@
MessageUtils::addContent(msg, data);
Queue queue("my_queue", 0, &store, 0);
- store.create(queue);
+ store.create(queue, qpid::framing::FieldTable());
store.enqueue(0, pmsg, queue);
store.destroy(*pmsg);
@@ -433,7 +434,7 @@
store.truncate();//make sure it is empty to begin with
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
- store.create(*exchange);
+ store.create(*exchange, qpid::framing::FieldTable());
id = exchange->getPersistenceId();
BOOST_REQUIRE(id);
}//db will be closed
@@ -476,8 +477,8 @@
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
- store.create(*exchange);
- store.create(*queue);
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue, qpid::framing::FieldTable());
BOOST_REQUIRE(exchange->bind(queue, key, &args));
store.bind(*exchange, *queue, key, args);
}//db will be closed
@@ -537,9 +538,9 @@
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
- store.create(*exchange);
- store.create(*queue1);
- store.create(*queue2);
+ store.create(*exchange, qpid::framing::FieldTable());
+ store.create(*queue1, qpid::framing::FieldTable());
+ store.create(*queue2, qpid::framing::FieldTable());
store.bind(*exchange, *queue1, key, args);
store.bind(*exchange, *queue2, key, args);
//delete queue1:
More information about the rhmessaging-commits
mailing list