[rhmessaging-commits] rhmessaging commits: r1810 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Mar 28 15:47:15 EDT 2008


Author: gordonsim
Date: 2008-03-28 15:47:15 -0400 (Fri, 28 Mar 2008)
New Revision: 1810

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/persistence.py
Log:
Unbind using binding key only.



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-03-28 19:47:15 UTC (rev 1810)
@@ -339,21 +339,10 @@
 }
 
 void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q, 
-                             const std::string& k, const FieldTable& a)
+                             const std::string& k, const FieldTable&)
 {
     checkInit();
-    IdDbt key(e.getPersistenceId());    
-    BindingDbt value(e, q, k, a);
-
-    TxnCtxt txn;
-    txn.begin(env, true);
-
-    if (deleteKeyValuePair(bindingDb, txn.get(), key, value)) {
-        txn.commit();
-    } else {
-        txn.abort();
-        THROW_STORE_EXCEPTION("Can't find binding");
-    }
+    deleteBinding(e, q, k);
 }
 
 void BdbMessageStore::recover(RecoveryManager& registry)
@@ -1365,6 +1354,40 @@
     QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
 }
 
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& bkey)
+{
+    TxnCtxt txn;
+    txn.begin(env, true);
+    try {
+        Cursor bindings;
+        bindings.open(bindingDb, txn.get());
+        
+        IdDbt key(exchange.getPersistenceId());
+        Dbt value;        
+
+        for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
+            Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+            if (buffer.available() < 8) {
+                THROW_STORE_EXCEPTION("Not enough data for binding");
+            }
+            uint64_t queueId = buffer.getLongLong();
+            if (queue.getPersistenceId() == queueId) {
+                std::string q;
+                std::string k;
+                buffer.getShortString(q);
+                buffer.getShortString(k);
+                if (bkey == k) {
+                    bindings->del(0);
+                    QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+                }
+            }
+        }
+    } catch (const std::exception& e) {
+        THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+    }
+    txn.commit();
+}
+
 string BdbMessageStore::getJrnlBaseDir() 
 {
     std::stringstream dir;

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-03-28 19:47:15 UTC (rev 1810)
@@ -123,6 +123,9 @@
             void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
             void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
             void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
+            void deleteBinding(const qpid::broker::PersistableExchange& exchange, 
+                               const qpid::broker::PersistableQueue& queue, 
+                               const std::string& key);
 
             u_int64_t getRecordSize(Db& db, Dbt& key);
             u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2008-03-28 19:47:15 UTC (rev 1810)
@@ -467,12 +467,9 @@
     }
 }
 
-void testExchangeBindAndUnbind(bool async)
+void bindAndUnbind(const string& exchangeName, const string& queueName, 
+                   const string& key, const FieldTable& args, bool async)
 {
-    string exchangeName("MyDurableExchange");
-    string queueName("MyDurableQueue");
-    string key("my-routing-key");
-    FieldTable args;
     {
         BdbMessageStore store;
         store.init(TESTDIR, async, true, 4, 1);
@@ -513,6 +510,19 @@
     }
 }
 
+void testExchangeBindAndUnbind(bool async)
+{
+    bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable(), async);
+}
+
+void testExchangeBindAndUnbindWithArgs(bool async)
+{
+    FieldTable args;
+    args.setString("a", "A");
+    args.setString("b", "B");
+    bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args, async);
+}
+
 void testExchangeImplicitUnbind(bool async)
 {
     string exchangeName("MyDurableExchange");
@@ -742,6 +752,20 @@
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsSync)
+{
+    cout << test_filename << ".ExchangeBindAndUnbindWithArgsSync: " << flush;
+    testExchangeBindAndUnbindWithArgs(false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsAsync)
+{
+    cout << test_filename << ".ExchangeBindAndUnbindWithArgsAsync: " << flush;
+    testExchangeBindAndUnbindWithArgs(true);
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindSync)
 {
     cout << test_filename << ".ExchangeImplicitUnbindSync: " << flush;

Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/tests/persistence.py	2008-03-28 19:47:15 UTC (rev 1810)
@@ -24,6 +24,7 @@
 from getopt import getopt, GetoptError
 import qpid.client, qpid.spec, qpid.content, qpid.testlib
 from qpid.client import Closed
+from qpid.queue import Empty
 from qpid.content import Content
 from struct import *
 from time import sleep
@@ -287,9 +288,68 @@
         channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
         channel.queue_delete(queue = "durable-subscriber-queue")
 
+        #test unbind:
+        #create a series of bindings to a queue
+        channel.queue_declare(queue = "binding-test-queue", durable=True)
+        channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="abc")
+        channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="pqr")
+        channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="xyz")
+        channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="a", arguments={"x-match":"all", "p":"a"})
+        channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="b", arguments={"x-match":"all", "p":"b"})
+        channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="c", arguments={"x-match":"all", "p":"c"})
+        #then restart broker...            
+        
+
     def phase8(self):
         channel = self.channel
 
+        #continue testing unbind:
+        #send messages to the queue via each of the bindings
+        for k in ["abc", "pqr", "xyz"]:
+            data = "first %s" % (k)
+            channel.message_transfer(destination= "amq.direct", content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+            data = "first %s" % (a["p"])
+            channel.message_transfer(destination="amq.match", content=Content(data, properties={'application_headers':a }))
+        #unbind some bindings (using final 0-10 semantics)
+        channel.queue_unbind(exchange="amq.direct", queue="binding-test-queue", routing_key="pqr")
+        channel.queue_unbind(exchange="amq.match", queue="binding-test-queue", routing_key="b")
+        #send messages again
+        for k in ["abc", "pqr", "xyz"]:
+            data = "second %s" % (k)
+            channel.message_transfer(destination= "amq.direct", content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+            data = "second %s" % (a["p"])
+            channel.message_transfer(destination="amq.match", content=Content(data, properties={'application_headers':a }))
+
+        #check that only the correct messages are received
+        expected = []
+        for k in ["abc", "pqr", "xyz"]:
+            expected.append("first %s" % (k))
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+            expected.append("first %s" % (a["p"]))
+        for k in ["abc", "xyz"]:
+            expected.append("second %s" % (k))
+        for a in [{"p":"a"}, {"p":"c"}]:    
+            expected.append("second %s" % (a["p"]))
+
+        channel.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test")
+        channel.message_flow(unit = 0, value = 10, destination = "binding-test")
+        queue = self.client.queue("binding-test")
+
+        while len(expected):
+            msg = queue.get(timeout=1)
+            if msg.content.body not in expected:
+                self.fail("Missing message: %s" % msg.content.body)                
+            expected.remove(msg.content.body)
+        try:
+            msg = queue.get(timeout=1)
+            self.fail("Got extra message: %s" % msg.content.body)
+        except Empty: pass
+
+        
+
         channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
         channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
         channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))




More information about the rhmessaging-commits mailing list