Author: kpvdr
Date: 2010-04-15 11:58:32 -0400 (Thu, 15 Apr 2010)
New Revision: 3913
Modified:
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Implementation of QPID-2509 (Remove message staging from C++ broker) - syncs with qpid
r.934463
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2010-04-14 17:34:58 UTC (rev 3912)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2010-04-15 15:58:32 UTC (rev 3913)
@@ -105,7 +105,7 @@
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr);
store->recover(recoveryMgr);
queue = queues.find(name);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2010-04-14 17:34:58 UTC (rev 3912)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2010-04-15 15:58:32 UTC (rev 3913)
@@ -70,7 +70,7 @@
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (&store);
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recovery(queues, exchanges, links, mgr);
store.recover(recovery);
}
@@ -360,136 +360,6 @@
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(Staging)
-{
- cout << test_filename << ".Staging: " << flush;
-
- const string name("MyDurableQueue");
- const string exchange("MyExchange");
- const string routingKey("MyRoutingKey");
- const Uuid messageId(true);
- const string data1("abcdefghijklmnopqrstuvwxyz");
- const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
- {
- MessageStoreImpl store;
- store.init(test_dir, 4, 1, true); // truncate store
-
- //create & stage a message
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange,
routingKey, messageId, (data1.size() + data2.size()));
- intrusive_ptr<PersistableMessage> pmsg =
static_pointer_cast<PersistableMessage>(msg);
- intrusive_ptr<const PersistableMessage> cpmsg =
static_pointer_cast<const PersistableMessage>(msg);
-
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
- FieldTable table;
- table.setString("abc", "xyz");
-
msg->getProperties<MessageProperties>()->setApplicationHeaders(table);
- store.stage(pmsg);
-
- //append to it
- msg->setStore(&store);
- msg->releaseContent();//ensure that data is not held in memory but is appended
to disk when added
- store.appendContent(cpmsg, data1);
- store.appendContent(cpmsg, data2);
-
- //enqueue it
- Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
- FieldTable settings;
- queue->create(settings);
- queue->enqueue(0, msg);
-
- //load it (without recovery)
- DummyHandler handler;
- QueuedMessage qm(queue.get(), msg, 0);
- //52 chars of data, i.e. 2 chunks of 20 and one of 12
- MessageUtils::deliver(qm, handler, 20);
- BOOST_REQUIRE(handler.frames.size() > 1);
- string loaded;
- for (uint i = 1; i < handler.frames.size(); i++) {
- AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[i].getBody()));
- BOOST_REQUIRE(contentBody);
- loaded += contentBody->getData();
- }
- BOOST_CHECK_EQUAL(data1 + data2, loaded);
-
- }//db will be closed
- {
- //recover
- MessageStoreImpl store;
- store.init(test_dir, 4, 1);
- QueueRegistry registry;
- registry.setStore (&store);
- ExchangeRegistry exchanges;
- LinkRegistry links;
- sys::Timer t;
- DtxManager dtx(t);
- dtx.setStore (&store);
- RecoveryManagerImpl recovery(registry, exchanges, links, dtx, 10);
- store.recover(recovery);
-
- //get message instance from queue
- Queue::shared_ptr queue = registry.find(name);
- BOOST_REQUIRE(queue);
- BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
- boost::intrusive_ptr<Message> msg = queue->get().payload;
-
- //check headers
- BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
- BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
- BOOST_CHECK_EQUAL(messageId,
msg->getProperties<MessageProperties>()->getMessageId());
- BOOST_CHECK_EQUAL((uint8_t) PERSISTENT,
msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_REQUIRE(Str16Value("xyz") ==
*msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
- BOOST_CHECK_EQUAL((u_int64_t) (data1.size() + data2.size()),
msg->getFrames().getHeaders()->getContentLength());
-
- BOOST_CHECK_EQUAL((u_int64_t) 0, msg->contentSize());//ensure it is being
lazily loaded
-
- //load lazily
- DummyHandler handler;
- QueuedMessage qm(queue.get(),msg,0);
- //52 chars of data, i.e. 2 chunks of 20 and one of 12
- MessageUtils::deliver(qm, handler, 20);
-
- BOOST_REQUIRE(handler.frames.size() > 1);
- string loaded;
- for (uint i = 1; i < handler.frames.size(); i++) {
- AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[i].getBody()));
- BOOST_REQUIRE(contentBody);
- loaded += contentBody->getData();
- }
- BOOST_CHECK_EQUAL(data1 + data2, loaded);
-
- //dequeue
- queue->dequeue(0, qm);
- }
-
- cout << "ok" << endl;
-}
-
-QPID_AUTO_TEST_CASE(DestroyStagedMessage)
-{
- cout << test_filename << ".DestroyStagedMessage: " <<
flush;
-
- MessageStoreImpl store;
- store.init(test_dir, 4, 1, true); // truncate store
-
- const string data("abcdefg");
- boost::intrusive_ptr<Message>
msg(MessageUtils::createMessage("my_exchange", "my_routing_key",
"my_message", data.length()));
- intrusive_ptr<PersistableMessage> pmsg =
static_pointer_cast<PersistableMessage>(msg);
- intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const
PersistableMessage>(msg);
- MessageUtils::addContent(msg, data);
-
- store.stage(pmsg);
- store.destroy(*pmsg);
-
- try {
- string loaded;
- Queue queue("dummy", 0, &store, 0);
- store.loadContent(queue, cpmsg, loaded, 0, data.length());
- BOOST_FAIL("store.loadContent() did not throw StoreException as
expected.");
- } catch (StoreException& e) {
- }
-
- cout << "ok" << endl;
-}
-
QPID_AUTO_TEST_CASE(DestroyEnqueuedMessage)
{
cout << test_filename << ".DestroyEnqueuedMessage: " <<
flush;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2010-04-14 17:34:58 UTC (rev 3912)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2010-04-15 15:58:32 UTC (rev 3913)
@@ -136,7 +136,7 @@
sys::Timer t;
DtxManager mgr(t);
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr);
store->recover(recovery);
queueA = queues->find(nameA);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2010-04-14 17:34:58 UTC (rev 3912)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2010-04-15 15:58:32 UTC (rev 3913)
@@ -361,7 +361,7 @@
links = std::auto_ptr<LinkRegistry>(new LinkRegistry);
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t));
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr);
store->recover(recovery);
queueA = queues->find(nameA);