Author: gordonsim
Date: 2009-02-26 12:19:48 -0500 (Thu, 26 Feb 2009)
New Revision: 3138
Modified:
store/trunk/cpp/tests/persistence.py
Log:
Added tests for LVQ
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2009-02-25 19:30:17 UTC (rev 3137)
+++ store/trunk/cpp/tests/persistence.py 2009-02-26 17:19:48 UTC (rev 3138)
@@ -64,6 +64,15 @@
message=self.createMessage(routing_key="a",
correlation_id="Msg0001", body="A_Message1"))
session.message_transfer(destination="amq.direct",
message=self.createMessage(routing_key="b",
correlation_id="Msg0002", body="B_Message1"))
+
+ session.queue_declare(queue="lvq-test", durable=True,
arguments={"qpid.last_value_queue":True})
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"B"}, body="B1"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"A"}, body="A1"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"A"}, body="A2"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"B"}, body="B2"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"B"}, body="B3"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"C"}, body="C1"))
+
def phase2(self):
@@ -100,9 +109,51 @@
session.message_transfer(destination="amq.topic",
message=self.createMessage(routing_key="abc",
correlation_id="Msg0003", body="AB_Message2"))
+ #check LVQ exists and has exepected messages:
+ session.queue_declare(queue="lvq-test", durable=True, passive=True)
+ session.message_subscribe(destination="lvq",
queue="lvq-test")
+ lvq = session.incoming("lvq")
+ lvq.start()
+ accepted = RangedSet()
+ for m in ["A2", "B3", "C1"]:
+ msg = lvq.get(timeout=1)
+ self.assertEquals(m, msg.body)
+ accepted.add(msg.id)
+ try:
+ extra = lvq.get(timeout=1)
+ self.fail("lvq-test not empty, contains: " + extra.body)
+ except Empty: None
+ #publish some more messages while subscriber is active (no replacement):
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"C"}, body="C2"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"C"}, body="C3"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"A"}, body="A3"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"A"}, body="A4"))
+
session.message_transfer(message=self.createMessage(routing_key="lvq-test",
application_headers={"qpid.LVQ_key":"C"}, body="C4"))
+ #check that accepting replaced messages is safe
+ session.message_accept(accepted)
+
def phase3(self):
session = self.session
+
+ #lvq recovery validation
+ session.queue_declare(queue="lvq-test", durable=True, passive=True)
+ session.message_subscribe(destination="lvq",
queue="lvq-test")
+ lvq = session.incoming("lvq")
+ lvq.start()
+ accepted = RangedSet()
+ lvq.start()
+ for m in ["C4", "A4"]:
+ msg = lvq.get(timeout=1)
+ self.assertEquals(m, msg.body)
+ accepted.add(msg.id)
+ session.message_accept(accepted)
+ try:
+ extra = lvq.get(timeout=1)
+ self.fail("lvq-test not empty, contains: " + extra.body)
+ except Empty: None
+ session.queue_delete(queue="lvq-test")
+
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)