Author: gordonsim
Date: 2007-08-28 15:40:03 -0400 (Tue, 28 Aug 2007)
New Revision: 897
Added:
store/trunk/cpp/tests/MessageUtils.h
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/persistence.py
Log:
Updates for change to message class on qpid trunk.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-08-28 19:40:03 UTC (rev 897)
@@ -23,8 +23,7 @@
#include "BdbMessageStore.h"
#include <qpid/broker/RecoveryManager.h>
-#include <qpid/broker/BrokerMessage.h>
-#include <qpid/broker/BrokerMessageMessage.h>
+#include <qpid/broker/Message.h>
#include <qpid/framing/Buffer.h>
#include <algorithm>
#include <sstream>
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/Makefile.am 2007-08-28 19:40:03 UTC (rev 897)
@@ -24,7 +24,7 @@
abs_srcdir=$(abs_srcdir) \
LIBBDBSTORE=$(abs_builddir)/../lib/.libs/libbdbstore.so
-EXTRA_DIST += $(TESTS) test_plugin.h setup .vg-supp
+EXTRA_DIST += $(TESTS) test_plugin.h MessageUtils.h setup .vg-supp
include gen.mk
Added: store/trunk/cpp/tests/MessageUtils.h
===================================================================
--- store/trunk/cpp/tests/MessageUtils.h (rev 0)
+++ store/trunk/cpp/tests/MessageUtils.h 2007-08-28 19:40:03 UTC (rev 897)
@@ -0,0 +1,56 @@
+/*
+ Copyright (C) 2007 Red Hat Software
+
+ This file is part of Red Hat Messaging.
+
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This file is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+ The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct MessageUtils
+{
+ static Message::shared_ptr createMessage(const string& exchange, const
string& routingKey,
+ const string& messageId="",
uint64_t contentSize = 0)
+ {
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props =
msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+ }
+
+ static void addContent(Message::shared_ptr msg, const string& data)
+ {
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
+ }
+};
Property changes on: store/trunk/cpp/tests/MessageUtils.h
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2007-08-28 19:40:03 UTC (rev 897)
@@ -22,9 +22,10 @@
*/
#include "BdbMessageStore.h"
+#include "MessageUtils.h"
#include <qpid/framing/AMQMethodBody.h>
#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/BrokerMessage.h>
+#include <qpid/broker/Message.h>
#include <qpid/broker/BrokerQueue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include "test_plugin.h"
@@ -97,12 +98,8 @@
string messageId(id.str());
ids.push(messageId);
- Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0,
"exchange", "routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ Message::shared_ptr msg = MessageUtils::createMessage("exchange",
"routing_key", messageId, 0);
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
queue->deliver(msg);
}
@@ -114,7 +111,7 @@
queue->dequeue(0, msg);
string expected = ids.front();
ids.pop();
- CPPUNIT_ASSERT_EQUAL(expected,
msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(expected,
msg->getProperties<MessageProperties>()->getMessageId());
return true;
} else {
return false;
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-08-28 19:40:03 UTC (rev 897)
@@ -23,14 +23,14 @@
#include "BdbMessageStore.h"
#include "test_plugin.h"
+#include "MessageUtils.h"
#include <qpid/Exception.h>
#include <qpid/framing/AMQP_HighestVersion.h>
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQMethodBody.h>
#include <qpid/framing/ChannelAdapter.h>
-#include <qpid/broker/BrokerMessage.h>
-#include <qpid/broker/BrokerMessageMessage.h>
+#include <qpid/broker/Message.h>
#include <qpid/broker/BrokerQueue.h>
#include <qpid/broker/DirectExchange.h>
#include <qpid/broker/RecoveryManagerImpl.h>
@@ -79,7 +79,6 @@
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testDestroyStagedMessage);
CPPUNIT_TEST(testDestroyEnqueuedMessage);
- CPPUNIT_TEST(testEnqueueMessageMessage);
CPPUNIT_TEST(testExchangeCreateAndDestroy);
CPPUNIT_TEST(testExchangeBindAndUnbind);
CPPUNIT_TEST(testExchangeImplicitUnbind);
@@ -199,18 +198,14 @@
FieldTable settings;
queue.create(settings);
- Message::shared_ptr msg = Message::shared_ptr(newBasicMessage(0, exchange,
routingKey, false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
- msg->setHeader(&header);
- msg->addContent(&part1);
- msg->addContent(&part2);
+ Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey,
messageId, 14);
+ MessageUtils::addContent(msg, data1);
+ MessageUtils::addContent(msg, data2);
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
- msg->getHeaderProperties()->getHeaders().setString("abc",
"xyz");
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ FieldTable table;
+ table.setString("abc", "xyz");
+
msg->getProperties<MessageProperties>()->setApplicationHeaders(table);
queue.enqueue(0, msg);
}//db will be closed
@@ -223,16 +218,16 @@
CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
Message::shared_ptr msg = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(messageId,
msg->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(PERSISTENT,
msg->getHeaderProperties()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getHeaderProperties()->getHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL(messageId,
msg->getProperties<MessageProperties>()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT,
msg->getProperties<DeliveryProperties>()->getDeliveryMode());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
DummyHandler handler;
DummyAdapter adapter(0, &handler,
qpid::framing::highestProtocolVersion);
- msg->deliver(adapter, 0,
BasicMessage::createConsumeToken("ignore"), 100);
+ MessageDelivery::deliver(msg, adapter, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 100);
CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[2].getBody()));
CPPUNIT_ASSERT(contentBody);
@@ -256,16 +251,10 @@
FieldTable settings;
queue.create(settings);
- Message::shared_ptr msg = Message::shared_ptr(newBasicMessage(0, exchange,
routingKey, false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(7);
- AMQContentBody content(data);
- msg->setHeader(&header);
- msg->addContent(&content);
+ Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey,
messageId, 7);
+ MessageUtils::addContent(msg, data);
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
-
queue.enqueue(0, msg);
queue.dequeue(0, msg);
}//db will be closed
@@ -292,23 +281,22 @@
store.truncate();//make sure it is empty to begin with
//create & stage a message
- Message::shared_ptr msg = Message::shared_ptr(newBasicMessage(0, exchange,
routingKey, false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(data1.size() + data2.size());
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
- msg->getHeaderProperties()->getHeaders().setString("abc",
"xyz");
+ Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey,
messageId, (data1.size() + data2.size()));
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ FieldTable table;
+ table.setString("abc", "xyz");
+
msg->getProperties<MessageProperties>()->setApplicationHeaders(table);
store.stage(*msg);
//append to it
msg->releaseContent(&store);//ensure that data is not held in memory
but is appended to disk when added
+ store.appendContent(*msg, data1);
+ store.appendContent(*msg, data2);
- AMQContentBody part1(data1);
- msg->addContent(&part1);
-
- AMQContentBody part2(data2);
- msg->addContent(&part2);
+ //AMQContentBody part1(data1);
+ //msg->addContent(&part1);FIXME
+ //AMQContentBody part2(data2);
+ //msg->addContent(&part2);FIXME
//enqueue it
Queue queue(name, 0, &store, 0);
@@ -319,10 +307,11 @@
//load it (without recovery)
DummyHandler handler;
DummyAdapter adapter(0, &handler,
qpid::framing::highestProtocolVersion);
- msg->deliver(adapter, 0,
BasicMessage::createConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
- CPPUNIT_ASSERT_EQUAL((size_t) 5, handler.frames.size());
+ MessageDelivery::deliver(msg, adapter, 0,
+
MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
+ CPPUNIT_ASSERT(handler.frames.size() > 2);
string loaded;
- for (int i = 2; i < 5; i++) {
+ for (uint i = 2; i < handler.frames.size(); i++) {
AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[i].getBody()));
CPPUNIT_ASSERT(contentBody);
loaded += contentBody->getData();
@@ -346,22 +335,23 @@
Message::shared_ptr msg = queue->dequeue();
//check headers
- CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(messageId,
msg->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(PERSISTENT,
msg->getHeaderProperties()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((u_int64_t) (data1.size() + data2.size()),
msg->expectedContentSize());
+ CPPUNIT_ASSERT_EQUAL(messageId,
msg->getProperties<MessageProperties>()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL((uint8_t) PERSISTENT,
msg->getProperties<DeliveryProperties>()->getDeliveryMode());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getProperties<MessageProperties>()->getApplicationHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) (data1.size() + data2.size()),
msg->getFrames().getHeaders()->getContentLength());
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, msg->encodedContentSize());//ensure it
is being lazily loaded
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 0, msg->contentSize());//ensure it is
being lazily loaded
//load lazily
DummyHandler handler;
DummyAdapter adapter(0, &handler,
qpid::framing::highestProtocolVersion);
- msg->deliver(adapter, 0,
BasicMessage::createConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
- CPPUNIT_ASSERT_EQUAL((size_t) 5, handler.frames.size());
+ MessageDelivery::deliver(msg, adapter, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
+
+ CPPUNIT_ASSERT(handler.frames.size() > 2);
string loaded;
- for (int i = 2; i < 5; i++) {
+ for (uint i = 2; i < handler.frames.size(); i++) {
AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[i].getBody()));
CPPUNIT_ASSERT(contentBody);
loaded += contentBody->getData();
@@ -379,9 +369,8 @@
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
- std::auto_ptr<Message> msg(createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
- AMQContentBody content(data);
- msg->addContent(&content);
+ Message::shared_ptr msg(MessageUtils::createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
+ MessageUtils::addContent(msg, data);
store.stage(*msg);
store.destroy(*msg);
@@ -400,9 +389,8 @@
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
- std::auto_ptr<Message> msg(createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
- AMQContentBody content(data);
- msg->addContent(&content);
+ Message::shared_ptr msg(MessageUtils::createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
+ MessageUtils::addContent(msg, data);
Queue queue("my_queue", 0, &store, 0);
store.create(queue);
@@ -419,73 +407,6 @@
}
- void testEnqueueMessageMessage()
- {
- string name("MyDurableQueue");
- string exchange("MyExchange");
- string routingKey("MyRoutingKey");
- string messageId = "MyMessage";
- FieldTable headers;
- headers.setString("abc", "xyz");
- qpid::framing::Content data(INLINE, "abcdefghijklmn");
- {
- BdbMessageStore store;
- store.truncate();//make sure it is empty to begin with
- Queue queue(name, 0, &store, 0);
- FieldTable settings;
- queue.create(settings);
-
- MessageTransferBody body(highestProtocolVersion,
- 0, //ticket,
- exchange, //destination,
- false, //redelivered,
- false, //reject-unroutable
- false, //immediate,
- 0, //ttl,
- 0, //priority,
- 0, //timestamp,
- PERSISTENT, //deliveryMode,
- 0, //expiration,
- exchange, //exchange,
- routingKey, //routingKey,
- messageId, //messageId,
- "", //correlationId,
- "", //replyTo,
- "", //contentType,
- "", //contentEncoding,
- 0, //contentLength,
- "", //type
- "", //userId,
- "", //appId,
- "", //transactionId,
- "", //securityToken,
- headers, //applicationHeaders,
- data //body
- );
- Message::shared_ptr msg = Message::shared_ptr(new MessageMessage(0,
&body));
-
-
- queue.enqueue(0, msg);
- }//db will be closed
- {
- BdbMessageStore store;
- QueueRegistry registry(&store);
- recover(store, registry);
- Queue::shared_ptr queue = registry.find(name);
- CPPUNIT_ASSERT(queue);
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
- Message::shared_ptr base_msg = queue->dequeue();
- MessageMessage::shared_ptr msg = dynamic_pointer_cast<MessageMessage,
Message>(base_msg);
- CPPUNIT_ASSERT(msg);
- CPPUNIT_ASSERT_EQUAL(exchange, msg->getTransfer()->getExchange());
- CPPUNIT_ASSERT_EQUAL(routingKey, msg->getTransfer()->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(messageId, msg->getTransfer()->getMessageId());
- CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT,
msg->getTransfer()->getDeliveryMode());
- CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getApplicationHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL(data.getValue(),
msg->getTransfer()->getBody().getValue());
- }
- }
-
void testExchangeCreateAndDestroy()
{
uint64_t id(0);
@@ -628,21 +549,6 @@
}
}
- Message* createMessage(const string& exchange, const string& routingKey,
const string& messageId, u_int64_t contentSize)
- {
- std::auto_ptr<Message> msg(newBasicMessage(0, exchange, routingKey, false,
false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(contentSize);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
- return msg.release();
- }
-
- Message* newBasicMessage(const ConnectionToken* const publisher, const string&
exchange, const string& routingKey, bool mandatory, bool immediate)
- {
- return new BasicMessage(publisher, exchange, routingKey, mandatory, immediate);
- }
-
};
// Make this test suite a plugin.
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2007-08-28 19:40:03 UTC (rev 897)
@@ -22,10 +22,11 @@
*/
#include "BdbMessageStore.h"
+#include "MessageUtils.h"
#include "test_plugin.h"
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQMethodBody.h>
-#include <qpid/broker/BrokerMessage.h>
+#include <qpid/broker/Message.h>
#include <qpid/broker/BrokerQueue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <iostream>
@@ -100,12 +101,8 @@
queueB->create(settings);
//create message and enqueue it onto first queue:
- Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0,
"exchange", "routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ Message::shared_ptr msg = MessageUtils::createMessage("exchange",
"routing_key", messageId, 0);
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
queueA->deliver(msg);
}
@@ -146,7 +143,7 @@
CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, y->getMessageCount());
Message::shared_ptr msg = y->dequeue();
CPPUNIT_ASSERT(msg);
- CPPUNIT_ASSERT_EQUAL(messageId,
msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(messageId,
msg->getProperties<MessageProperties>()->getMessageId());
}
};
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-08-28 19:40:03 UTC (rev 897)
@@ -23,9 +23,10 @@
#include "test_plugin.h"
#include "BdbMessageStore.h"
+#include "MessageUtils.h"
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/AMQMethodBody.h>
-#include <qpid/broker/BrokerMessage.h>
+#include <qpid/broker/Message.h>
#include <qpid/broker/BrokerQueue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <iostream>
@@ -300,12 +301,8 @@
Message::shared_ptr createMessage(const string& id, const string&
exchange="exchange", const string& key="routing_key")
{
- Message::shared_ptr msg = Message::shared_ptr(new BasicMessage(0, exchange, key,
false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(id);
- msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ Message::shared_ptr msg = MessageUtils::createMessage(exchange, key, id, 0);
+
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
return msg;
}
@@ -345,7 +342,7 @@
CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, y->getMessageCount());
Message::shared_ptr msg = y->dequeue();
CPPUNIT_ASSERT(msg);
- CPPUNIT_ASSERT_EQUAL(msgid, msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(msgid,
msg->getProperties<MessageProperties>()->getMessageId());
}
void checkA(u_int32_t size, const string& msgid = "<none>")
@@ -355,7 +352,7 @@
if (size > 0) {
Message::shared_ptr msg = queueA->dequeue();
CPPUNIT_ASSERT(msg);
- CPPUNIT_ASSERT_EQUAL(msgid,
msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL(msgid,
msg->getProperties<MessageProperties>()->getMessageId());
}
}
};
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-08-28 19:14:13 UTC (rev 896)
+++ store/trunk/cpp/tests/persistence.py 2007-08-28 19:40:03 UTC (rev 897)
@@ -24,6 +24,7 @@
from getopt import getopt, GetoptError
import qpid.client, qpid.spec, qpid.content, qpid.testlib
from qpid.client import Closed
+from qpid.content import Content
from struct import *
from time import sleep
@@ -37,8 +38,10 @@
channel.queue_bind(queue="queue-a", exchange="amq.direct",
routing_key="a")
channel.queue_bind(queue="queue-b", exchange="amq.direct",
routing_key="b")
- channel.message_transfer(destination="amq.direct",
routing_key="a", message_id="Msg0001", delivery_mode=2,
body="A_Message1")
- channel.message_transfer(destination="amq.direct",
routing_key="b", message_id="Msg0002", delivery_mode=2,
body="B_Message1")
+ channel.message_transfer(destination="amq.direct",
+
content=Content(properties={'routing_key':"a",
'message_id':"Msg0001", 'delivery_mode':2},
body="A_Message1"))
+ channel.message_transfer(destination="amq.direct",
+
content=Content(properties={'routing_key':"b",
'message_id':"Msg0002", 'delivery_mode':2},
body="B_Message1"))
def phase2(self):
@@ -65,7 +68,8 @@
channel.queue_bind(queue="queue-b", exchange="amq.topic",
routing_key="abc")
channel.queue_bind(queue="queue-c", exchange="amq.topic",
routing_key="abc")
- channel.message_transfer(destination="amq.topic",
routing_key="abc", message_id="Msg0003", delivery_mode=2,
body="AB_Message2")
+ channel.message_transfer(destination="amq.topic",
+
content=Content(properties={'routing_key':"abc",
'message_id':"Msg0003", 'delivery_mode':2},
body="AB_Message2"))
def phase3(self):
@@ -92,9 +96,15 @@
self.assertEmptyQueue("queue-c")
#note: default bindings must be restored for this to work
- channel.message_transfer(routing_key="queue-a",
message_id="Msg0004", delivery_mode=2, body="A_Message3")
- channel.message_transfer(routing_key="queue-a",
message_id="Msg0005", delivery_mode=2, body="A_Message4")
- channel.message_transfer(routing_key="queue-a",
message_id="Msg0006", delivery_mode=2, body="A_Message5")
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-a",
'message_id':"Msg0004", 'delivery_mode':2},
+ body="A_Message3"))
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-a",
'message_id':"Msg0005", 'delivery_mode':2},
+ body="A_Message4"))
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-a",
'message_id':"Msg0006", 'delivery_mode':2},
+ body="A_Message5"))
channel.tx_commit()
@@ -111,7 +121,9 @@
msg = included.get(timeout=1)
self.assertExpectedContent(msg, "Msg0006",
"A_Message5").complete()
- channel.message_transfer(destination="amq.direct",
routing_key="queue-b", message_id="Msg0007", delivery_mode=2,
body="B_Message3")
+ channel.message_transfer(destination="amq.direct", content=Content(
+ properties={'routing_key':"queue-b",
'message_id':"Msg0007", 'delivery_mode':2},
+ body="B_Message3"))
channel.tx_rollback()
@@ -150,10 +162,18 @@
for q in queues:
channel.queue_declare(queue=q, durable=True)
- channel.message_transfer(routing_key="queue-a1",
message_id="MsgA", delivery_mode=2, body="MessageA")
- channel.message_transfer(routing_key="queue-b1",
message_id="MsgB", delivery_mode=2, body="MessageB")
- channel.message_transfer(routing_key="queue-c1",
message_id="MsgC", delivery_mode=2, body="MessageC")
- channel.message_transfer(routing_key="queue-d1",
message_id="MsgD", delivery_mode=2, body="MessageD")
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-a1",
'message_id':"MsgA", 'delivery_mode':2},
+ body="MessageA"))
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-b1",
'message_id':"MsgB", 'delivery_mode':2},
+ body="MessageB"))
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-c1",
'message_id':"MsgC", 'delivery_mode':2},
+ body="MessageC"))
+ channel.message_transfer(content=Content(
+ properties={'routing_key':"queue-d1",
'message_id':"MsgD", 'delivery_mode':2},
+ body="MessageD"))
channel.dtx_demarcation_select()
txa = self.xid('a')
@@ -242,7 +262,8 @@
self.assertEqual(8, self.channel.dtx_demarcation_start(xid=tx).status)
self.channel.message_get(destination="temp-swap", queue=src)
msg = self.client.queue("temp-swap").get(timeout=1)
- self.channel.message_transfer(routing_key=dest, message_id=msg.message_id,
delivery_mode=2, body=msg.body)
+
self.channel.message_transfer(content=Content(properties={'routing_key':dest,
'message_id':msg.content['message_id'], 'delivery_mode':2},
+ body=msg.content.body))
msg.complete();
self.assertEqual(8, self.channel.dtx_demarcation_end(xid=tx).status)
@@ -264,8 +285,8 @@
self.assertEqual(method, reply.method.name)
def assertExpectedContent(self, content, id, body):
- self.assertEqual(id, content.message_id)
- self.assertEqual(body, content.body)
+ self.assertEqual(id, content.content['message_id'])
+ self.assertEqual(body, content.content.body)
return content
def assertExpectedGetResult(self, id, body):