[rhmessaging-commits] rhmessaging commits: r1729 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Feb 27 10:54:02 EST 2008


Author: gordonsim
Date: 2008-02-27 10:54:01 -0500 (Wed, 27 Feb 2008)
New Revision: 1729

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/tests/persistence.py
   store/trunk/cpp/tests/system_test.sh
Log:
Fix to delete bindings when queue is deleted
Fix to remove stale messages on recovery
Extra tests



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-02-27 15:54:01 UTC (rev 1729)
@@ -269,6 +269,7 @@
 {
     checkInit();
     destroy(queueDb, queue);
+    deleteBindingsForQueue(queue);
     qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
     if (eqs)
     {
@@ -501,6 +502,7 @@
             exchange->second->bind(queueName, routingkey, args);
         } else {
             //stale binding, delete it
+            QPID_LOG(warning, "Deleting stale binding");
             bindings->del(0);
         }
     }
@@ -675,10 +677,11 @@
 
     int count(0);
     for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
-        RecoverableQueue::shared_ptr queue = index[value.id];
-        if (!queue) {
+        if (index.find(value.id) == index.end()) {
             QPID_LOG(warning, "Recovered message for queue that no longer exists");
+            mappings->del(0);
         } else {            
+            RecoverableQueue::shared_ptr queue = index[value.id];
             if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
                 prepared[msgId.id] = msg;
             } else {
@@ -1334,6 +1337,34 @@
     }
 }
 
+void BdbMessageStore::deleteBindingsForQueue(const PersistableQueue& queue)
+{
+    TxnCtxt txn;
+    txn.begin(env, true);
+    try {
+        Cursor bindings;
+        bindings.open(bindingDb, txn.get());
+        
+        IdDbt key;
+        Dbt value;        
+        while (bindings.next(key, value)) {
+            Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+            if (buffer.available() < 8) {
+                THROW_STORE_EXCEPTION("Not enough data for binding");
+            }
+            uint64_t queueId = buffer.getLongLong();
+            if (queue.getPersistenceId() == queueId) {
+                bindings->del(0);
+                QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+            }
+        }
+    } catch (const std::exception& e) {
+        THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+    }
+    txn.commit();
+    QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
+}
+
 string BdbMessageStore::getJrnlBaseDir() 
 {
     std::stringstream dir;

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-02-27 15:54:01 UTC (rev 1729)
@@ -122,6 +122,7 @@
             bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
             void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
             void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+            void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
 
             u_int64_t getRecordSize(Db& db, Dbt& key);
             u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);

Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/persistence.py	2008-02-27 15:54:01 UTC (rev 1729)
@@ -247,7 +247,55 @@
             else:
                 self.assertEqual(0, channel.queue_query(queue=q).message_count)
 
+    def phase7(self):
+        channel = self.channel
+        channel.synchronous = False
 
+        #test deletion of queue after publish
+        #create queue
+        channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        for i in range(1, 10):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+        channel.synchronous = True
+        #explicitly delete queue
+        channel.queue_delete(queue = "q")
+
+        #test acking of message from auto-deleted queue
+        #create queue
+        channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+        #create consumer
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+        queue = self.client.queue("a")
+
+        #consume the message, cancel subscription (triggering auto-delete), then ack it
+        msg = queue.get(timeout = 5)
+        channel.message_cancel(destination = "a")        
+        msg.complete()
+
+        #test implicit deletion of bindings when queue is deleted
+        channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+        channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+        channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+        channel.queue_delete(queue = "durable-subscriber-queue")
+
+    def phase8(self):
+        channel = self.channel
+
+        channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+        channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+        channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+        channel.queue_delete(queue = "durable-subscriber-queue")
+
+
     def xid(self, txid, branchqual = ''):
         return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
         

Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh	2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/system_test.sh	2008-02-27 15:54:01 UTC (rev 1729)
@@ -72,7 +72,7 @@
         mode='bdb'
         echo 'BDB persistence...'
     fi
-    for p in `seq 1 6`; do
+    for p in `seq 1 8`; do
         log="$abs_srcdir/vg-log.$mode.$p"
         #echo "$vg $QPIDD -m 0 --data dir "" --load-module $LIBBDBSTORE $JRNLFLAGS"
         $vg $QPIDD -m 0 --data-dir "" --load-module $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log" 2> $log & pid=$!




More information about the rhmessaging-commits mailing list