Author: gordonsim
Date: 2008-02-27 10:54:01 -0500 (Wed, 27 Feb 2008)
New Revision: 1729
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Fix to delete bindings when queue is deleted
Fix to remove stale messages on recovery
Extra tests
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-27 15:54:01 UTC (rev 1729)
@@ -269,6 +269,7 @@
{
checkInit();
destroy(queueDb, queue);
+ deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs)
{
@@ -501,6 +502,7 @@
exchange->second->bind(queueName, routingkey, args);
} else {
//stale binding, delete it
+ QPID_LOG(warning, "Deleting stale binding");
bindings->del(0);
}
}
@@ -675,10 +677,11 @@
int count(0);
for (int status = mappings->get(&msgId, &value, DB_SET); status == 0;
status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
- RecoverableQueue::shared_ptr queue = index[value.id];
- if (!queue) {
+ if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer
exists");
+ mappings->del(0);
} else {
+ RecoverableQueue::shared_ptr queue = index[value.id];
if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
prepared[msgId.id] = msg;
} else {
@@ -1334,6 +1337,34 @@
}
}
+void BdbMessageStore::deleteBindingsForQueue(const PersistableQueue& queue)
+{
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
+
+ IdDbt key;
+ Dbt value;
+ while (bindings.next(key, value)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
+ }
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
+ }
+ }
+ } catch (const std::exception& e) {
+ THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ }
+ txn.commit();
+ QPID_LOG(debug, "Deleted all bindings for " << queue.getName()
<< ":" << queue.getPersistenceId());
+}
+
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-02-27 15:54:01 UTC (rev 1729)
@@ -122,6 +122,7 @@
bool create(Db& db, IdSequence& seq, const
qpid::broker::Persistable& p);
void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId,
u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue&
queue);
u_int64_t getRecordSize(Db& db, Dbt& key);
u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/persistence.py 2008-02-27 15:54:01 UTC (rev 1729)
@@ -247,7 +247,55 @@
else:
self.assertEqual(0, channel.queue_query(queue=q).message_count)
+ def phase7(self):
+ channel = self.channel
+ channel.synchronous = False
+ #test deletion of queue after publish
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ channel.message_transfer(content=Content(properties={'routing_key' :
"q", 'delivery_mode':2}, body = "my-message"))
+
+ channel.synchronous = True
+ #explicitly delete queue
+ channel.queue_delete(queue = "q")
+
+ #test acking of message from auto-deleted queue
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' :
"q", 'delivery_mode':2}, body = "my-message"))
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "a",
confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queue = self.client.queue("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ channel.message_cancel(destination = "a")
+ msg.complete()
+
+ #test implicit deletion of bindings when queue is deleted
+ channel.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
+ channel.queue_bind(exchange="amq.topic",
queue="durable-subscriber-queue", routing_key="xyz")
+ channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))
+ channel.queue_delete(queue = "durable-subscriber-queue")
+
+ def phase8(self):
+ channel = self.channel
+
+ channel.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
+ channel.queue_bind(exchange="amq.topic",
queue="durable-subscriber-queue", routing_key="xyz")
+ channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))
+ channel.queue_delete(queue = "durable-subscriber-queue")
+
+
def xid(self, txid, branchqual = ''):
return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/system_test.sh 2008-02-27 15:54:01 UTC (rev 1729)
@@ -72,7 +72,7 @@
mode='bdb'
echo 'BDB persistence...'
fi
- for p in `seq 1 6`; do
+ for p in `seq 1 8`; do
log="$abs_srcdir/vg-log.$mode.$p"
#echo "$vg $QPIDD -m 0 --data dir "" --load-module $LIBBDBSTORE
$JRNLFLAGS"
$vg $QPIDD -m 0 --data-dir "" --load-module $LIBBDBSTORE $JRNLFLAGS
>> "$abs_srcdir/qpid.log" 2> $log & pid=$!