[rhmessaging-commits] rhmessaging commits: r3317 - in store/trunk/cpp/tests: cluster and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Apr 21 10:11:41 EDT 2009


Author: kpvdr
Date: 2009-04-21 10:11:41 -0400 (Tue, 21 Apr 2009)
New Revision: 3317

Modified:
   store/trunk/cpp/tests/Makefile.am
   store/trunk/cpp/tests/cluster/_st_cluster_basic.cpp
   store/trunk/cpp/tests/unit_test.h
Log:
Latest clustering tests

Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am	2009-04-21 12:34:13 UTC (rev 3316)
+++ store/trunk/cpp/tests/Makefile.am	2009-04-21 14:11:41 UTC (rev 3317)
@@ -28,7 +28,7 @@
 
 TMPDIR=$(abs_srcdir)/test_tmp
 
-SUBDIRS = cluster jrnl .
+SUBDIRS = jrnl cluster .
 
 TESTS =						    \
   SimpleTest                    \

Modified: store/trunk/cpp/tests/cluster/_st_cluster_basic.cpp
===================================================================
--- store/trunk/cpp/tests/cluster/_st_cluster_basic.cpp	2009-04-21 12:34:13 UTC (rev 3316)
+++ store/trunk/cpp/tests/cluster/_st_cluster_basic.cpp	2009-04-21 14:11:41 UTC (rev 3317)
@@ -24,6 +24,8 @@
 #include "../unit_test.h"
 #include <boost/assign.hpp>
 #include <cstdlib>
+
+#include "qpid/client/FailoverManager.h"
 #include "ClusterFixture.h"
 
 #define SET_LOG_LEVEL(level) \
@@ -34,6 +36,8 @@
 
 QPID_AUTO_TEST_SUITE(SimpleTests)
 
+// --- Helper functions ---
+
 // Timeout for tests that wait for messages
 const sys::Duration TIMEOUT=sys::TIME_SEC/4;
 
@@ -72,11 +76,35 @@
     return result;
 }
 
-QPID_AUTO_TEST_CASE(TestMessageEnqueue)
+int64_t getMsgSequence(const Message& m) {
+    return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+class Sender {
+  public:
+    Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+    void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+        AMQFrame f(body);
+        f.setChannel(channel);
+        f.setFirstSegment(firstSeg);
+        f.setLastSegment(lastSeg);
+        f.setFirstFrame(firstFrame);
+        f.setLastFrame(lastFrame);
+        connection->handle(f);
+    }
+
+  private:
+    boost::shared_ptr<ConnectionImpl> connection;
+    uint16_t channel;
+};
+
+// --- Tests ---
+
+QPID_AUTO_TEST_CASE(testMessageEnqueue)
 {
     SET_LOG_LEVEL("error+"); // This only needs to be set once.
 
-    cout << test_filename << ".TestMessageEnqueue: " << flush;
+    cout << test_filename << ".testMessageEnqueue: " << flush;
     ClusterFixture::Args args = boost::assign::list_of<std::string>
         ("--load-module")(getLibPath("LIBSTORE"))
         ("--auth")("no")
@@ -196,4 +224,472 @@
     cout << " done" << endl << flush;
 }
 
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+    cout << test_filename << ".testSequenceOptions: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    // Make sure the exchange qpid.msg_sequence property is properly replicated.
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+    FieldTable ftArgs;
+    ftArgs.setInt("qpid.msg_sequence", 1);
+    c0.session.queueDeclare(arg::queue="q", arg::durable=true);
+    c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftArgs);
+    c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+    c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex", arg::durable=true);
+    BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+    BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+
+    cluster.add();
+    Client c1(cluster[1]);
+    c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+    BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+    cout << test_filename << ".testTxTransaction: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare(arg::queue="q", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("A", "q"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("B", "q"), arg::durable=true);
+
+    // Start a transaction that will commit.
+    Session commitSession = c0.connection.newSession("commit");
+    SubscriptionManager commitSubs(commitSession);
+    commitSession.txSelect();
+    commitSession.messageTransfer(arg::content=Message("a", "q"), arg::durable=true);
+    commitSession.messageTransfer(arg::content=Message("b", "q"), arg::durable=true);
+    BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
+
+    // Start a transaction that will roll back.
+    Session rollbackSession = c0.connection.newSession("rollback");
+    SubscriptionManager rollbackSubs(rollbackSession);
+    rollbackSession.txSelect();
+    rollbackSession.messageTransfer(arg::content=Message("1", "q"), arg::durable=true);
+    Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
+    BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
+
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    // Add new member mid transaction.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    // More transactional work
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    rollbackSession.messageTransfer(arg::content=Message("2", "q"), arg::durable=true);
+    commitSession.messageTransfer(arg::content=Message("c", "q"), arg::durable=true);
+    rollbackSession.messageTransfer(arg::content=Message("3", "q"), arg::durable=true);
+
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Commit/roll back.
+    commitSession.txCommit();
+    rollbackSession.txRollback();
+    rollbackSession.messageRelease(rollbackMessage.getId());
+
+
+    // Verify queue status: just the committed messages and dequeues should remain.
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testUnacked) {
+    cout << test_filename << ".testUnacked: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    // Verify replication of unacknowledged messages.
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+
+    Message m;
+
+    // Create unacked message: acquired but not accepted.
+    SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+    c0.session.queueDeclare("q1", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("11","q1"), arg::durable=true);
+    LocalQueue q1;
+    c0.subs.subscribe(q1, "q1", manualAccept);
+    BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+    // Create unacked message: not acquired, accepted or completeed.
+    SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+    c0.session.queueDeclare("q2", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("21","q2"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("22","q2"), arg::durable=true);
+    LocalQueue q2;
+    c0.subs.subscribe(q2, "q2", manualAcquire);
+    m = q2.get(TIMEOUT);  // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(m.getData(), "21");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+    c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+    BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+    // Create empty credit record: acquire and accept but don't complete.
+    SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+    c0.session.queueDeclare("q3", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("31", "q3"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("32", "q3"), arg::durable=true);
+    LocalQueue q3;
+    c0.subs.subscribe(q3, "q3", manualComplete);
+    Message m31=q3.get(TIMEOUT);
+    BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
+    // Add new member while there are unacked messages.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    // Check queue counts
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+    // Complete the empty credit message, should unblock the message behind it.
+    BOOST_CHECK_THROW(q3.get(0), Exception);
+    c0.session.markCompleted(SequenceSet(m31.getId()), true);
+    BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+    // Close the original session - unacked messages should be requeued.
+    c0.session.close();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+    BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
+    cout << " done" << endl << flush;
+}
+
+// TODO: FIXME: This test does not run under the current definition of QPID_AUTO_TEST_CASE_EXPECTED_FAILURES in unit_test.h
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
+    cout << test_filename << ".testUpdateTxState: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    // Verify that we update transaction state correctly to new members.
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+
+    // Do work in a transaction.
+    c0.session.txSelect();
+    c0.session.queueDeclare("q", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("1","q"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("2","q"), arg::durable=true);
+    Message m;
+    BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "1");
+
+    // New member, TX not committed, c1 should see nothing.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+    // After commit c1 should see results of tx.
+    c0.session.txCommit();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "2");
+
+    // Another transaction with both members active.
+    c0.session.messageTransfer(arg::content=Message("3","q"));
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+    c0.session.txCommit();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "3");
+    cout << " done" << endl << flush;
+}
+
+// TODO - Message not durable, figure out how to do this.
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+    cout << test_filename << ".testUpdateMessageBuilder: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    // Verify that we update a partially recieved message to a new member.
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q", arg::durable=true);
+    Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+    // Send first 2 frames of message.
+    MessageTransferBody transfer(
+        ProtocolVersion(), string(), // default exchange.
+        framing::message::ACCEPT_MODE_NONE,
+        framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+    sender.send(transfer, true, false, true, true);
+    AMQHeaderBody header;
+    header.get<DeliveryProperties>(true)->setRoutingKey("q");
+    sender.send(header, false, false, true, true);
+
+    // No reliable way to ensure the partial message has arrived
+    // before we start the new broker, so we sleep.
+    sys::usleep(2500);
+    cluster.add();
+
+    // Send final 2 frames of message.
+    sender.send(AMQContentBody("ab"), false, true, true, false);
+    sender.send(AMQContentBody("cd"), false, true, false, true);
+
+    // Verify message is enqued correctly on second member.
+    Message m;
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "abcd");
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
+    cout << test_filename << ".testUpdateConsumers: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("p", arg::durable=true);
+    c0.session.queueDeclare("q", arg::durable=true);
+    c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+    LocalQueue lp;
+    c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+    c0.session.sync();
+
+    // Start new members
+    cluster.add();              // Local
+    Client c1(cluster[1], "c1");
+    cluster.add();
+    Client c2(cluster[2], "c2");
+
+    // Transfer messages
+    c0.session.messageTransfer(arg::content=Message("aaa", "q"), arg::durable=true);
+
+    c0.session.messageTransfer(arg::content=Message("bbb", "p"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("ccc", "p"), arg::durable=true);
+
+    // Activate the subscription, ensure message removed on all queues.
+    c0.subs.setFlowControl("q", FlowControl::unlimited());
+    Message m;
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "aaa");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Check second subscription's flow control: gets first message, not second.
+    BOOST_CHECK(lp.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+    BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
+    // Kill the subscribing member, ensure further messages are not removed.
+    cluster.killWithSilencer(0,c0.connection,9);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
+    for (int i = 0; i < 10; ++i) {
+        c1.session.messageTransfer(arg::content=Message("xxx", "q"));
+        BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
+        BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
+    }
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testCatchupSharedState) {
+    cout << test_filename << ".testCatchupSharedState: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    ClusterFixture cluster(1, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0], "c0");
+
+    // Create some shared state.
+    c0.session.queueDeclare("q", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("foo","q"), arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("bar","q"), arg::durable=true);
+
+    while (c0.session.queueQuery("q").getMessageCount() != 2)
+        sys::usleep(1000);    // Wait for message to show up on broker 0.
+
+    // Add a new broker, it will catch up.
+    cluster.add();
+
+    // Do some work post-add
+    c0.session.queueDeclare("p", arg::durable=true);
+    c0.session.messageTransfer(arg::content=Message("pfoo","p"), arg::durable=true);
+
+    // Do some work post-join
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
+    c0.session.messageTransfer(arg::content=Message("pbar","p"), arg::durable=true);
+
+    // Verify new brokers have state.
+    Message m;
+
+    Client c1(cluster[1], "c1");
+
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "bar");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Add another broker, don't wait for join - should be stalled till ready.
+    cluster.add();
+    Client c2(cluster[2], "c2");
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "pbar");
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+    cout << test_filename << ".testWiringReplication: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    ClusterFixture cluster(3, -1, args, getLibPath("LIBCLUSTER"));
+    Client c0(cluster[0]);
+    BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+    BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+    c0.session.queueDeclare("q", arg::durable=true);
+    c0.session.exchangeDeclare("ex", arg::type="direct");
+    c0.session.close();
+    c0.connection.close();
+    // Verify all brokers get wiring update.
+    for (size_t i = 0; i < cluster.size(); ++i) {
+        BOOST_MESSAGE("i == "<< i);
+        Client c(cluster[i]);
+        BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+        BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+    }
+    cout << " done" << endl << flush;
+}
+
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+    cout << test_filename << ".testHeartbeatCancelledOnFailover: " << flush;
+    ClusterFixture::Args args = boost::assign::list_of<std::string>
+        ("--load-module")(getLibPath("LIBSTORE"))
+        ("--auth")("no")
+        ("TMP_DATA_DIR");
+
+    struct Sender : FailoverManager::Command
+    {
+        std::string queue;
+        std::string content;
+
+        Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+        void execute(AsyncSession& session, bool)
+        {
+            session.messageTransfer(arg::content=Message(content, queue), arg::durable=true);
+        }
+    };
+
+    struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+    {
+        FailoverManager& mgr;
+        std::string queue;
+        std::string expectedContent;
+        qpid::client::Subscription subscription;
+        qpid::sys::Monitor lock;
+        bool ready;
+
+        Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false) {}
+
+        void received(Message& message)
+        {
+            BOOST_CHECK_EQUAL(expectedContent, message.getData());
+            subscription.cancel();
+        }
+
+        void execute(AsyncSession& session, bool)
+        {
+            session.queueDeclare(arg::queue=queue, arg::durable=true);
+            SubscriptionManager subs(session);
+            subscription = subs.subscribe(*this, queue);
+            session.sync();
+            setReady();
+            subs.run();
+            //cleanup:
+            session.queueDelete(arg::queue=queue);
+        }
+
+        void run()
+        {
+            mgr.execute(*this);
+        }
+
+        void waitForReady()
+        {
+            qpid::sys::Monitor::ScopedLock l(lock);
+            while (!ready) {
+                lock.wait();
+            }
+        }
+
+        void setReady()
+        {
+            qpid::sys::Monitor::ScopedLock l(lock);
+            ready = true;
+            lock.notify();
+        }
+    };
+
+    ClusterFixture cluster(2, -1, args, getLibPath("LIBCLUSTER"));
+    ConnectionSettings settings;
+    settings.port = cluster[1];
+    settings.heartbeat = 1;
+    FailoverManager fmgr(settings);
+    Sender sender("my-queue", "my-data");
+    Receiver receiver(fmgr, "my-queue", "my-data");
+    qpid::sys::Thread runner(receiver);
+    receiver.waitForReady();
+    cluster.kill(1);
+    //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+    ::usleep(2*1000*1000);
+    fmgr.execute(sender);
+    runner.join();
+    fmgr.close();
+    cout << " done" << endl << flush;
+}
+
 QPID_AUTO_TEST_SUITE_END()

Modified: store/trunk/cpp/tests/unit_test.h
===================================================================
--- store/trunk/cpp/tests/unit_test.h	2009-04-21 12:34:13 UTC (rev 3316)
+++ store/trunk/cpp/tests/unit_test.h	2009-04-21 14:11:41 UTC (rev 3317)
@@ -30,29 +30,42 @@
 
 #include <boost/version.hpp>
 
-#if (BOOST_VERSION < 103300)
-
+#if (BOOST_VERSION < 103400) // v.1.33 and earlier
 # include <boost/test/auto_unit_test.hpp>
+#else // v.1.34 and later
+# include <boost/test/unit_test.hpp>
+#endif
 
+// Keep the test function for compilation but do not not register it.
+// TODO aconway 2008-04-23: better workaround for expected failures.
+// The following causes the test testUpdateTxState not to run at all.
+# define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n)             \
+    namespace { struct test_name { void test_method(); };  }            \
+    void test_name::test_method()
+// The following runs the test testUpdateTxState, but it fails.
+/*#define QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(test_name,n)             \
+    namespace { struct test_name { void test_method(); };  }            \
+    BOOST_AUTO_TEST_CASE(name)*/
+
+#if (BOOST_VERSION < 103300) // v.1.32 and earlier
+
 # define QPID_AUTO_TEST_SUITE(name)
 # define QPID_AUTO_TEST_CASE(name)  BOOST_AUTO_UNIT_TEST(name)
 # define QPID_AUTO_TEST_SUITE_END()
 
-#elif (BOOST_VERSION < 103400)
+#elif (BOOST_VERSION < 103400) // v.1.33
 
-# include <boost/test/auto_unit_test.hpp>
-
+// Note the trailing ';'
 # define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name);
 # define QPID_AUTO_TEST_CASE(name)  BOOST_AUTO_TEST_CASE(name)
 # define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END();
 
-#else
+#else // v.1.34 and later
 
 # define QPID_AUTO_TEST_SUITE(name) BOOST_AUTO_TEST_SUITE(name)
 # define QPID_AUTO_TEST_CASE(name)  BOOST_AUTO_TEST_CASE(name)
 # define QPID_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()
 
-# include <boost/test/unit_test.hpp>
 #endif
 
 #endif  /*!QPIPD_TEST_UNIT_TEST_H_*/




More information about the rhmessaging-commits mailing list