Author: cctrieloff
Date: 2007-10-23 08:55:04 -0400 (Tue, 23 Oct 2007)
New Revision: 1141
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
Log:
- updated test to be honor dir set
- disable dequeue callback until AMQP can support it
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-23 00:56:26 UTC (rev 1140)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-23 12:55:04 UTC (rev 1141)
@@ -579,7 +579,6 @@
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
//kpvdr TODO -- this list needs to be mutexed...???
-// need to delete the dtok's
std::deque<rhm::journal::data_tok*>
this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
journal->_aio_wr_cmpl_dtok_list.end());
@@ -595,9 +594,11 @@
dtokp->getSourceMessage()->enqueueComplete();
break;
case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after
last dequeue
dtokp->getSourceMessage()->setPersistenceId(0);
+*/
break;
default:
;
@@ -613,7 +614,6 @@
{
//kpvdr TODO -- can we get rid of the copy???
-// need to delete the dtok's
std::deque<rhm::journal::data_tok*>
this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-10-23 00:56:26 UTC (rev 1140)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-10-23 12:55:04 UTC (rev 1141)
@@ -313,9 +313,9 @@
}
}
- void testStagingSync() {testStaging();}
+ void testStagingSync() {testStaging(false);}
void testStagingAsync() {std::cout << std::endl << "Missing Async
test!!" << std::endl << std:: flush;}
- void testStaging()
+ void testStaging(bool async)
{
const string name("MyDurableQueue");
const string exchange("MyExchange");
@@ -325,6 +325,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
BdbMessageStore store;
+ if (async) store.init("/var",async);
store.truncate();//make sure it is empty to begin with
//create & stage a message
@@ -368,6 +369,7 @@
{
//recover
BdbMessageStore store;
+ if (async) store.init("/var",async);
QueueRegistry registry(&store);
ExchangeRegistry exchanges;
DtxManager dtx(&store);
@@ -408,11 +410,12 @@
}
}
- void testDestroyStagedMessageSync() {testDestroyStagedMessage();}
+ void testDestroyStagedMessageSync() {testDestroyStagedMessage(false);}
void testDestroyStagedMessageAsync() {std::cout << std::endl <<
"Missing Async test!!" << std::endl << std:: flush;}
- void testDestroyStagedMessage()
+ void testDestroyStagedMessage(bool async)
{
BdbMessageStore store;
+ if (async) store.init("/var",async);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -430,11 +433,12 @@
}
}
- void testDestroyEnqueuedMessageSync() {testDestroyEnqueuedMessage();}
+ void testDestroyEnqueuedMessageSync() {testDestroyEnqueuedMessage(false);}
void testDestroyEnqueuedMessageAsync() {std::cout << std::endl <<
"Missing Async test!!" << std::endl << std:: flush;}
- void testDestroyEnqueuedMessage()
+ void testDestroyEnqueuedMessage(bool async)
{
BdbMessageStore store;
+ if (async) store.init("/var",async);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");