Author: gordonsim
Date: 2008-03-28 15:47:15 -0400 (Fri, 28 Mar 2008)
New Revision: 1810
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/persistence.py
Log:
Unbind using binding key only.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-28 19:47:15 UTC (rev 1810)
@@ -339,21 +339,10 @@
}
void BdbMessageStore::unbind(const PersistableExchange& e, const
PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+ const std::string& k, const FieldTable&)
{
checkInit();
- IdDbt key(e.getPersistenceId());
- BindingDbt value(e, q, k, a);
-
- TxnCtxt txn;
- txn.begin(env, true);
-
- if (deleteKeyValuePair(bindingDb, txn.get(), key, value)) {
- txn.commit();
- } else {
- txn.abort();
- THROW_STORE_EXCEPTION("Can't find binding");
- }
+ deleteBinding(e, q, k);
}
void BdbMessageStore::recover(RecoveryManager& registry)
@@ -1365,6 +1354,40 @@
QPID_LOG(debug, "Deleted all bindings for " << queue.getName()
<< ":" << queue.getPersistenceId());
}
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const
PersistableQueue& queue, const std::string& bkey)
+{
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
+
+ IdDbt key(exchange.getPersistenceId());
+ Dbt value;
+
+ for (int status = bindings->get(&key, &value, DB_SET); status == 0;
status = bindings->get(&key, &value, DB_NEXT_DUP)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
+ }
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ std::string q;
+ std::string k;
+ buffer.getShortString(q);
+ buffer.getShortString(k);
+ if (bkey == k) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
+ }
+ }
+ }
+ } catch (const std::exception& e) {
+ THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ }
+ txn.commit();
+}
+
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-03-28 19:47:15 UTC (rev 1810)
@@ -123,6 +123,9 @@
void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId,
u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue&
queue);
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key);
u_int64_t getRecordSize(Db& db, Dbt& key);
u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-03-28 19:47:15 UTC (rev 1810)
@@ -467,12 +467,9 @@
}
}
-void testExchangeBindAndUnbind(bool async)
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+ const string& key, const FieldTable& args, bool async)
{
- string exchangeName("MyDurableExchange");
- string queueName("MyDurableQueue");
- string key("my-routing-key");
- FieldTable args;
{
BdbMessageStore store;
store.init(TESTDIR, async, true, 4, 1);
@@ -513,6 +510,19 @@
}
}
+void testExchangeBindAndUnbind(bool async)
+{
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue",
"my-routing-key", FieldTable(), async);
+}
+
+void testExchangeBindAndUnbindWithArgs(bool async)
+{
+ FieldTable args;
+ args.setString("a", "A");
+ args.setString("b", "B");
+ bindAndUnbind("MyDurableExchange", "MyDurableQueue",
"my-routing-key", args, async);
+}
+
void testExchangeImplicitUnbind(bool async)
{
string exchangeName("MyDurableExchange");
@@ -742,6 +752,20 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsSync)
+{
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgsSync: "
<< flush;
+ testExchangeBindAndUnbindWithArgs(false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgsAsync)
+{
+ cout << test_filename << ".ExchangeBindAndUnbindWithArgsAsync:
" << flush;
+ testExchangeBindAndUnbindWithArgs(true);
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(ExchangeImplicitUnbindSync)
{
cout << test_filename << ".ExchangeImplicitUnbindSync: "
<< flush;
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-03-27 21:43:45 UTC (rev 1809)
+++ store/trunk/cpp/tests/persistence.py 2008-03-28 19:47:15 UTC (rev 1810)
@@ -24,6 +24,7 @@
from getopt import getopt, GetoptError
import qpid.client, qpid.spec, qpid.content, qpid.testlib
from qpid.client import Closed
+from qpid.queue import Empty
from qpid.content import Content
from struct import *
from time import sleep
@@ -287,9 +288,68 @@
channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))
channel.queue_delete(queue = "durable-subscriber-queue")
+ #test unbind:
+ #create a series of bindings to a queue
+ channel.queue_declare(queue = "binding-test-queue", durable=True)
+ channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="abc")
+ channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="pqr")
+ channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="xyz")
+ channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="a",
arguments={"x-match":"all", "p":"a"})
+ channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="b",
arguments={"x-match":"all", "p":"b"})
+ channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="c",
arguments={"x-match":"all", "p":"c"})
+ #then restart broker...
+
+
def phase8(self):
channel = self.channel
+ #continue testing unbind:
+ #send messages to the queue via each of the bindings
+ for k in ["abc", "pqr", "xyz"]:
+ data = "first %s" % (k)
+ channel.message_transfer(destination= "amq.direct",
content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ for a in [{"p":"a"}, {"p":"b"},
{"p":"c"}]:
+ data = "first %s" % (a["p"])
+ channel.message_transfer(destination="amq.match",
content=Content(data, properties={'application_headers':a }))
+ #unbind some bindings (using final 0-10 semantics)
+ channel.queue_unbind(exchange="amq.direct",
queue="binding-test-queue", routing_key="pqr")
+ channel.queue_unbind(exchange="amq.match",
queue="binding-test-queue", routing_key="b")
+ #send messages again
+ for k in ["abc", "pqr", "xyz"]:
+ data = "second %s" % (k)
+ channel.message_transfer(destination= "amq.direct",
content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ for a in [{"p":"a"}, {"p":"b"},
{"p":"c"}]:
+ data = "second %s" % (a["p"])
+ channel.message_transfer(destination="amq.match",
content=Content(data, properties={'application_headers':a }))
+
+ #check that only the correct messages are received
+ expected = []
+ for k in ["abc", "pqr", "xyz"]:
+ expected.append("first %s" % (k))
+ for a in [{"p":"a"}, {"p":"b"},
{"p":"c"}]:
+ expected.append("first %s" % (a["p"]))
+ for k in ["abc", "xyz"]:
+ expected.append("second %s" % (k))
+ for a in [{"p":"a"}, {"p":"c"}]:
+ expected.append("second %s" % (a["p"]))
+
+ channel.message_subscribe(queue = "binding-test-queue", destination =
"binding-test")
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
"binding-test")
+ channel.message_flow(unit = 0, value = 10, destination =
"binding-test")
+ queue = self.client.queue("binding-test")
+
+ while len(expected):
+ msg = queue.get(timeout=1)
+ if msg.content.body not in expected:
+ self.fail("Missing message: %s" % msg.content.body)
+ expected.remove(msg.content.body)
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.content.body)
+ except Empty: pass
+
+
+
channel.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
channel.queue_bind(exchange="amq.topic",
queue="durable-subscriber-queue", routing_key="xyz")
channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))