[rhmessaging-commits] rhmessaging commits: r3916 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Apr 21 13:09:14 EDT 2010


Author: kpvdr
Date: 2010-04-21 13:09:13 -0400 (Wed, 21 Apr 2010)
New Revision: 3916

Modified:
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/python_tests/client_persistence.py
   store/trunk/cpp/tests/python_tests/flow_to_disk.py
   store/trunk/cpp/tests/python_tests/store_test.py
   store/trunk/cpp/tests/run_python_tests
Log:
Removed BDB messages.db. This leaves several methods unimplemented, such as stage() appendContent() and destroy(). However, since the broker has had staging removed, these are never called. These methods now throw an unimplemented exception.

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-04-21 17:09:13 UTC (rev 3916)
@@ -329,7 +329,6 @@
         queueDb.reset(new Db(dbenv.get(), 0));
         configDb.reset(new Db(dbenv.get(), 0));
         exchangeDb.reset(new Db(dbenv.get(), 0));
-        messageDb.reset(new Db(dbenv.get(), 0));
         mappingDb.reset(new Db(dbenv.get(), 0));
         bindingDb.reset(new Db(dbenv.get(), 0));
         generalDb.reset(new Db(dbenv.get(), 0));
@@ -338,7 +337,6 @@
         open(queueDb, txn.get(), "queues.db", false);
         open(configDb, txn.get(), "config.db", false);
         open(exchangeDb, txn.get(), "exchanges.db", false);
-        open(messageDb, txn.get(), "messages.db", false);
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
@@ -760,7 +758,7 @@
         }
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+            jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
             journalList[queueName] = jQueue;
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
@@ -1011,43 +1009,11 @@
     }
 }
 
-RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& recovery,
-                                                                 uint64_t messageId,
-                                                                 unsigned& headerSize)
+RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
+                                                                 uint64_t /*messageId*/,
+                                                                 unsigned& /*headerSize*/)
 {
-    RecoverableMessage::shared_ptr ret;
-    Dbt key (&messageId, sizeof(messageId));
-    size_t preamble_length = sizeof(u_int32_t); /*header size*/
-
-    BufferValue value(preamble_length, 0);
-    value.buffer.record();
-
-    TxnCtxt txn;
-    txn.begin(dbenv.get(), true);
-    try {
-        if (messageDb->get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
-            txn.abort();
-            THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-        }
-
-        //read header only to begin with
-        headerSize = value.buffer.getLong();
-
-        BufferValue header(headerSize, preamble_length);
-        if (messageDb->get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
-            txn.abort();
-            THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-        }
-        ret = recovery.recoverMessage(header.buffer);
-        txn.commit();
-    } catch (const DbException& e) {
-        txn.abort();
-        THROW_STORE_EXCEPTION("Unexpected BDB error in MessageStoreImpl::getExternMessage(): " + std::string(e.what()));
-    } catch (...) {
-        txn.abort();
-        throw;
-    }
-    return ret;
+    throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
 }
 
 int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
@@ -1211,46 +1177,14 @@
     }
 }
 
-void MessageStoreImpl::stage(const intrusive_ptr<PersistableMessage>& msg)
+void MessageStoreImpl::stage(const intrusive_ptr<PersistableMessage>& /*msg*/)
 {
-    checkInit();
-    TxnCtxt txn;
-    txn.begin(dbenv.get(), true);
-
-    u_int64_t messageId (msg->getPersistenceId());
-    if (messageId == 0 || !msg->isContentReleased()) {
-        try {
-            Dbt key (&messageId, sizeof(messageId));
-            messageId = messageIdSequence.next();
-            store(NULL, &txn, key, msg, true);
-            msg->setPersistenceId(messageId);
-            txn.commit();
-        } catch (...) {
-            txn.abort();
-            throw;
-        }
-    }
+    throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
 }
 
-void MessageStoreImpl::destroy(PersistableMessage& msg)
+void MessageStoreImpl::destroy(PersistableMessage& /*msg*/)
 {
-    checkInit();
-    u_int64_t messageId (msg.getPersistenceId());
-    if (messageId) {
-        Dbt key (&messageId, sizeof(messageId));
-        TxnCtxt txn;
-        txn.begin(dbenv.get(), true);
-        try {
-            deleteIfUnused(txn.get(), key);
-            txn.commit();
-        } catch (const DbException& e) {
-            txn.abort();
-            THROW_STORE_EXCEPTION_2("Error destroying message", e);
-        } catch (...) {
-            txn.abort();
-            throw;
-        }
-    }
+    throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
 }
 
 u_int64_t MessageStoreImpl::getRecordSize(db_ptr db,
@@ -1287,41 +1221,10 @@
     return peek.get_size();
 }
 
-void MessageStoreImpl::appendContent(const intrusive_ptr<const PersistableMessage>& msg,
-                                    const std::string& data)
+void MessageStoreImpl::appendContent(const intrusive_ptr<const PersistableMessage>& /*msg*/,
+                                    const std::string& /*data*/)
 {
-    checkInit();
-    u_int64_t messageId (msg->getPersistenceId());
-    if (messageId != 0) {
-        TxnCtxt txn;
-        txn.begin(dbenv.get(), true);
-        try {
-            Dbt key (&messageId, sizeof(messageId));
-            u_int64_t offset = getRecordSize(messageDb, key);
-            const int size(data.length());
-
-            //don't want to have to copy this and shouldn't need to as
-            //it will be used for reading only. but have to cast away
-            //the const-ness, which is nasty...
-            //alternative involves copying:
-            //char buffer[size];
-            //data.copy(buffer, size);
-            Dbt value((void*) data.data(), size);
-            value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-            value.set_doff(offset);
-            value.set_dlen(size);
-            messageDb->put(txn.get(), &key, &value, 0);
-            txn.commit();
-        } catch (const DbException& e) {
-            txn.abort();
-            THROW_STORE_EXCEPTION_2("Error appending content", e);
-        } catch (...) {
-            txn.abort();
-            throw;
-        }
-    } else {
-        THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
-    }
+    throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
 }
 
 void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
@@ -1337,53 +1240,19 @@
         try {
             JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
             if (jc && jc->is_enqueued(messageId) ) {
-                if (jc->loadMsgContent(messageId, data, length, offset)) {
-                    return;
+                if (!jc->loadMsgContent(messageId, data, length, offset)) {
+                    std::ostringstream oss;
+                    oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern";
+                    THROW_STORE_EXCEPTION(oss.str());
                 }
+            } else {
+                std::ostringstream oss;
+                oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued";
+                THROW_STORE_EXCEPTION(oss.str());
             }
         } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
         }
-        TxnCtxt txn;
-        txn.begin(dbenv.get(), true);
-        try {
-            Dbt key (&messageId, sizeof(messageId));
-            char *buffer = new char[length];
-            Dbt value(buffer, length);
-
-            // Read the first 4 bytes (u_int32_t) which is the header size.
-            value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-            value.set_ulen(sizeof(u_int32_t));
-            value.set_doff(0);
-            value.set_dlen(sizeof(u_int32_t));
-            int status = messageDb->get(txn.get(), &key, &value, 0);
-            if (status == DB_NOTFOUND) {
-                delete [] buffer;
-                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-            }
-            u_int32_t hdr_size = Buffer(buffer, sizeof(u_int32_t)).getLong();
-
-            // Now read the data.
-            u_int64_t realOffset = sizeof(u_int32_t) + hdr_size + offset;
-            value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
-            value.set_ulen(length);
-            value.set_doff(realOffset);
-            value.set_dlen(length);
-            status = messageDb->get(txn.get(), &key, &value, 0);
-            if (status == DB_NOTFOUND) {
-                delete [] buffer;
-                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-            }
-            data.assign(buffer, value.get_size());
-            delete [] buffer;
-            txn.commit();
-        } catch (const DbException& e) {
-            txn.abort();
-            THROW_STORE_EXCEPTION_2("Error loading content", e);
-        } catch (...) {
-            txn.abort();
-            throw;
-        }
     } else {
         THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
     }
@@ -1415,7 +1284,6 @@
     if (queueId == 0) {
         THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
     }
-    Dbt key (&messageId, sizeof(messageId));
 
     TxnCtxt implicit;
     TxnCtxt* txn = 0;
@@ -1431,33 +1299,36 @@
         msg->setPersistenceId(messageId);
         newId = true;
     }
-    store(&queue, txn, key, msg, newId);
+    store(&queue, txn, msg, newId);
 
     // add queue* to the txn map..
     if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 }
 
-void MessageStoreImpl::store(const PersistableQueue* queue,
-                            TxnCtxt* txn, Dbt& messageId,
-                            const intrusive_ptr<PersistableMessage>& message,
-                            bool newId)
+u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const intrusive_ptr<PersistableMessage>& message)
 {
     u_int32_t headerSize = message->encodedHeaderSize();
     u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
-    std::vector<char> buff;
-    if (!message->isContentReleased() )
-    {
-        try { buff = std::vector<char>(size); } // long + headers + content
-        catch (const std::exception& e) {
-            std::ostringstream oss;
-            oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
-            THROW_STORE_EXCEPTION(oss.str());
-        }
-        Buffer buffer(&buff[0],size);
-        buffer.putLong(headerSize);
-        message->encode(buffer);
+    try { buff = std::vector<char>(size); } // long + headers + content
+    catch (const std::exception& e) {
+        std::ostringstream oss;
+        oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
+        THROW_STORE_EXCEPTION(oss.str());
     }
+    Buffer buffer(&buff[0],size);
+    buffer.putLong(headerSize);
+    message->encode(buffer);
+    return size;
+}
 
+void MessageStoreImpl::store(const PersistableQueue* queue,
+                            TxnCtxt* txn,
+                            const intrusive_ptr<PersistableMessage>& message,
+                            bool /*newId*/)
+{
+    std::vector<char> buff;
+    u_int64_t size = msgEncode(buff, message);
+
     try {
         if (queue) {
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
@@ -1481,12 +1352,8 @@
                 }
             }
         } else {
-            /// cct message db
-            if (newId) {  // only store in Bd if first time message is stored
-                Dbt data(&buff[0],size);
-                messageDb->put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
-            }
-        }
+            THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: queue NULL.");
+       }
     } catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
                               e.what());
@@ -1558,39 +1425,6 @@
     return 0;
 }
 
-bool MessageStoreImpl::deleteIfUnused(DbTxn* txn,
-                                     Dbt& messageId)
-{
-    Cursor cursor;
-    cursor.open(mappingDb, txn);
-    return deleteIfUnused(cursor, txn, messageId);
-}
-
-bool MessageStoreImpl::deleteIfUnused(Cursor& cursor,
-                                     DbTxn* txn,
-                                     Dbt& messageId)
-{
-    if (isUnused(cursor, messageId)) {
-        messageDb->del(txn, &messageId, 0);
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool MessageStoreImpl::isUnused(Cursor& cursor, Dbt& messageId)
-{
-    Dbt empty;
-    int status = cursor->get(&messageId, &empty, DB_SET);
-    if (status == DB_NOTFOUND) {
-        return true;
-    } else if (status == 0) {
-        return false;
-    } else {
-        THROW_STORE_EXCEPTION("Dequeue failed (in isUnused()) with status = " + status);
-    }
-}
-
 void MessageStoreImpl::completed(TxnCtxt& txn,
                                 bool commit)
 {
@@ -1833,7 +1667,7 @@
 
 std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
 {
-    return getJrnlDir(queue.getName().c_str());
+    return getJrnlHashDir(queue.getName().c_str());
 }
 
 u_int32_t MessageStoreImpl::bHash(const std::string str)
@@ -1845,7 +1679,7 @@
     return h;
 }
 
-std::string MessageStoreImpl::getJrnlDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/
+std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/
 {
     std::stringstream dir;
     dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2010-04-21 17:09:13 UTC (rev 3916)
@@ -118,7 +118,6 @@
     db_ptr queueDb;
     db_ptr configDb;
     db_ptr exchangeDb;
-    db_ptr messageDb;
     db_ptr mappingDb;
     db_ptr bindingDb;
     db_ptr generalDb;
@@ -207,21 +206,14 @@
     void recoverTplStore();
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
+    u_int64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
     void store(const qpid::broker::PersistableQueue* queue,
                TxnCtxt* txn,
-               Dbt& messageId,
                const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
                bool newId);
     void async_dequeue(qpid::broker::TransactionContext* ctxt,
                        const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
                        const qpid::broker::PersistableQueue& queue);
-    bool deleteIfUnused(Cursor& cursor,
-                        DbTxn* txn,
-                        Dbt& messageId);
-    bool deleteIfUnused(DbTxn* txn,
-                        Dbt& messageId);
-    bool isUnused(Cursor& cursor,
-                  Dbt& messageId);
     void destroy(db_ptr db,
                  const qpid::broker::Persistable& p);
     bool create(db_ptr db,
@@ -260,7 +252,7 @@
     void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
     u_int32_t bHash(const std::string str);
     std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
-    std::string getJrnlDir(const std::string& queueName);
+    std::string getJrnlHashDir(const std::string& queueName);
     std::string getJrnlBaseDir();
     std::string getBdbBaseDir();
     std::string getTplBaseDir();

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2010-04-21 17:09:13 UTC (rev 3916)
@@ -54,6 +54,7 @@
 const u_int32_t jerrno::JERR__TIMEOUT           = 0x0107;
 const u_int32_t jerrno::JERR__UNEXPRESPONSE     = 0x0108;
 const u_int32_t jerrno::JERR__RECNFOUND         = 0x0109;
+const u_int32_t jerrno::JERR__NOTIMPL           = 0x010a;
 
 // class jcntl
 const u_int32_t jerrno::JERR_JCNTL_STOPPED      = 0x0200;
@@ -144,6 +145,7 @@
     _err_map[JERR__TIMEOUT] = "JERR__TIMEOUT: Timeout waiting for event.";
     _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event.";
     _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
+    _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
 
     // class jcntl
     _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
@@ -152,7 +154,7 @@
     _err_map[JERR_JCNTL_UNKNOWNMAGIC] = "JERR_JCNTL_UNKNOWNMAGIC: Found record with unknown magic.";
     _err_map[JERR_JCNTL_NOTRECOVERED] = "JERR_JCNTL_NOTRECOVERED: Operation requires recover() to be run first.";
     _err_map[JERR_JCNTL_RECOVERJFULL] = "JERR_JCNTL_RECOVERJFULL: Journal data files full, cannot write.";
-    _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: Overwrite Indecator (OWI) change found in unexpected location.";
+    _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: Overwrite Indicator (OWI) change found in unexpected location.";
 
     // class jdir
     _err_map[JERR_JDIR_NOTDIR] = "JERR_JDIR_NOTDIR: Directory name exists but is not a directory.";

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2010-04-21 17:09:13 UTC (rev 3916)
@@ -72,6 +72,7 @@
         static const u_int32_t JERR__TIMEOUT;           ///< Timeout waiting for an event
         static const u_int32_t JERR__UNEXPRESPONSE;     ///< Unexpected response to call or event
         static const u_int32_t JERR__RECNFOUND;         ///< Record not found
+        static const u_int32_t JERR__NOTIMPL;           ///< Not implemented
 
         // class jcntl
         static const u_int32_t JERR_JCNTL_STOPPED;      ///< Operation on stopped journal

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2010-04-21 17:09:13 UTC (rev 3916)
@@ -360,35 +360,6 @@
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(DestroyEnqueuedMessage)
-{
-    cout << test_filename << ".DestroyEnqueuedMessage: " << flush;
-
-    MessageStoreImpl store;
-    store.init(test_dir, 4, 1, true); // truncate store
-
-    const string data("abcdefg");
-    boost::intrusive_ptr<Message> msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
-    intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
-    intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
-    MessageUtils::addContent(msg, data);
-
-    Queue queue("my_queue", 0, &store, 0);
-    store.create(queue, qpid::framing::FieldTable());
-
-    store.enqueue(0, pmsg, queue);
-    store.destroy(*pmsg);
-
-    string loaded;
-    store.loadContent(queue, cpmsg, loaded, 0, data.length());
-    BOOST_CHECK_EQUAL(data, loaded);
-
-    store.dequeue(0, pmsg, queue);
-    store.destroy(queue);
-
-    cout << "ok" << endl;
-}
-
 QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
 {
     cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;

Modified: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py	2010-04-21 17:09:13 UTC (rev 3916)
@@ -129,6 +129,7 @@
         qmf = Qmf(broker)
         qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
         qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
+        qmf.close()
         broker.terminate()
 
         broker = self.broker(store_args(), name="testExchangeBroker")
@@ -143,6 +144,7 @@
             self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error)
         self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
                         "Alternate exchange property not found or is incorrect on exchange \"testExch\".")
+        qmf.close()
         
     def test_queue(self):
         """Queue alternate exchange property persistexchangeNamece test"""
@@ -150,6 +152,7 @@
         qmf = Qmf(broker)
         qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
         qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
+        qmf.close()
         broker.terminate()
 
         broker = self.broker(store_args(), name="testQueueBroker")
@@ -164,6 +167,7 @@
             self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error)
         self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"),
                         "Alternate exchange property not found or is incorrect on queue \"testQueue\".")
+        qmf.close()
 
 
 class RedeliveredTests(StoreTest):

Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py	2010-04-21 17:09:13 UTC (rev 3916)
@@ -294,11 +294,12 @@
         
     def test_durable(self):
         """Durable message test"""
-        self.simple_limit("MaxSizeMaxCountDurable", max_count=10, max_size=1000, msg_size=250)
+        self.simple_limit("MaxSizeMaxCountDurable", max_count=10, max_size=1000, msg_size=250, msg_durable=True)
         
     def test_durable_recover(self):
         """Durable message recover test"""
-        self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10, max_size=1000, msg_size=250, recover=True)
+        self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10, max_size=1000, msg_size=250, msg_durable=True,
+                          recover=True)
         
     def test_browse(self):
         """Browse test"""
@@ -310,12 +311,13 @@
         
     def test_durable_browse(self):
         """Browse durable message test"""
-        self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10, max_size=1000, msg_size=250, browse=True)
+        self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10, max_size=1000, msg_size=250, msg_durable=True,
+                          browse=True)
         
     def test_durable_browse_recover(self):
         """Browse durable messages before and after recover"""
-        self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10, max_size=1000, msg_size=250, browse=True,
-                          recover=True)
+        self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10, max_size=1000, msg_size=250,
+                          msg_durable=True, browse=True, recover=True)
 
 # ======================================================================================================================
 

Modified: store/trunk/cpp/tests/python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/tests/python_tests/store_test.py	2010-04-21 17:09:13 UTC (rev 3916)
@@ -107,6 +107,10 @@
     def queue_empty(self, queue_name):
         """Check if a queue is empty (has no messages waiting)"""
         return self.queue_message_count(queue_name) == 0
+    
+    def close(self):
+        self.__session.delBroker(self.__broker)
+        self.__session = None
 
 
 class StoreTest(BrokerTest):

Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests	2010-04-21 15:40:23 UTC (rev 3915)
+++ store/trunk/cpp/tests/run_python_tests	2010-04-21 17:09:13 UTC (rev 3916)
@@ -42,7 +42,7 @@
 
 case x$1 in
     xSHORT_TEST)
-        DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;;
+        DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_browse_recover *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;;
     xLONG_TEST)
         DEFAULT_PYTHON_TESTS= ;;
     x)



More information about the rhmessaging-commits mailing list