[rhmessaging-commits] rhmessaging commits: r3138 - store/trunk/cpp/tests.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Feb 26 12:19:48 EST 2009


Author: gordonsim
Date: 2009-02-26 12:19:48 -0500 (Thu, 26 Feb 2009)
New Revision: 3138

Modified:
   store/trunk/cpp/tests/persistence.py
Log:
Added tests for LVQ



Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2009-02-25 19:30:17 UTC (rev 3137)
+++ store/trunk/cpp/tests/persistence.py	2009-02-26 17:19:48 UTC (rev 3138)
@@ -64,6 +64,15 @@
                                  message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1"))
         session.message_transfer(destination="amq.direct",
                                  message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1"))
+
+        session.queue_declare(queue="lvq-test", durable=True, arguments={"qpid.last_value_queue":True})
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B1"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A1"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A2"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B2"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1"))
+
         
 
     def phase2(self):    
@@ -100,9 +109,51 @@
         session.message_transfer(destination="amq.topic",
                                  message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2"))
 
+        #check LVQ exists and has exepected messages:
+        session.queue_declare(queue="lvq-test", durable=True, passive=True)
+        session.message_subscribe(destination="lvq", queue="lvq-test")
+        lvq = session.incoming("lvq")
+        lvq.start()
+        accepted = RangedSet()
+        for m in ["A2", "B3", "C1"]:
+            msg = lvq.get(timeout=1)
+            self.assertEquals(m, msg.body)
+            accepted.add(msg.id)
+        try:
+            extra = lvq.get(timeout=1)
+            self.fail("lvq-test not empty, contains: " + extra.body)
+        except Empty: None
+        #publish some more messages while subscriber is active (no replacement):
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C2"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C3"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A3"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A4"))
+        session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C4"))
+        #check that accepting replaced messages is safe
+        session.message_accept(accepted)
 
+
     def phase3(self):    
         session = self.session
+
+        #lvq recovery validation
+        session.queue_declare(queue="lvq-test", durable=True, passive=True)
+        session.message_subscribe(destination="lvq", queue="lvq-test")
+        lvq = session.incoming("lvq")
+        lvq.start()
+        accepted = RangedSet()
+        lvq.start()
+        for m in ["C4", "A4"]:
+            msg = lvq.get(timeout=1)
+            self.assertEquals(m, msg.body)
+            accepted.add(msg.id)
+        session.message_accept(accepted)
+        try:
+            extra = lvq.get(timeout=1)
+            self.fail("lvq-test not empty, contains: " + extra.body)
+        except Empty: None
+        session.queue_delete(queue="lvq-test")
+
         
         #check queues exists
         session.queue_declare(queue="queue-a", durable=True, passive=True)




More information about the rhmessaging-commits mailing list