Author: kpvdr
Date: 2008-07-05 16:14:45 -0400 (Sat, 05 Jul 2008)
New Revision: 2177
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/persistence.py
Log:
Fixed problems with transaction recover: DataToken in TxnCtxt was not being restored; also
highest rid found during restore was not taking account of the new preparedXid instance
which shares the messageIdSequence. Other minor bugfixes and tidy-ups.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -29,6 +29,9 @@
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
using namespace rhm::bdbstore;
using namespace qpid::broker;
using boost::static_pointer_cast;
@@ -58,6 +61,7 @@
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
isInit(false),
envPath(envpath)
@@ -460,10 +464,18 @@
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
}
+
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+
+ // Restore data token state in TxnCtxt
+ xid_rid_map_citr citr = preparedMap.find(i->xid);
+ if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in
preparedMap");
+ tpcc->recoverDtok(citr->second, i->xid);
+ tpcc->addXidRecord(preparedXidStorePtr.get());
+
RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid,
std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j !=
i->enqueues->end(); j++) {
@@ -492,7 +504,6 @@
IdDbt key;
Dbt value;
//read all queues
- u_int64_t highestRid = 0;
while (queues.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
//create a Queue instance
@@ -524,7 +535,11 @@
queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
+
+ // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
+ // the messageIdSequence is used for both queue journals and the preparedXid
journal.
messageIdSequence.reset(highestRid + 1);
+
queueIdSequence.reset(maxQueueId + 1);
}
@@ -605,9 +620,6 @@
}
-#define MAX_AIO_SLEEPS 1000 // ~1 second
-#define AIO_SLEEP_TIME 1000 // 1 milisecond
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr&
queue, txn_list& locked, message_index& prepared)
{
@@ -768,19 +780,6 @@
}
}
-void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- Dbt ignore;
- while (c.next(key, ignore)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- xids.insert(xid);
- }
-}
-
void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
{
Cursor c;
@@ -798,15 +797,50 @@
{
if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
{
- u_int64_t highest_rid;
+ u_int64_t thisHighestRid;
preparedXidStorePtr->recover(defXidStoreNumJrnlFiles,
defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks,
defXidStoreWCachePageSize,
- 0, highest_rid, 0);
-
- std::vector<std::string> xv;
- preparedXidStorePtr->get_open_txn_list(xv);
- for (std::vector<std::string>::const_iterator itr = xv.begin(); itr <
xv.end(); itr++)
- xids.insert(*itr);
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ try {
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ DataTokenImpl dtokp;
+ bool done = false;
+ long aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ rhm::journal::iores res =
preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize,
transientFlag, externalFlag, &dtokp);
+ switch (res) {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ if (xidbuffSize > 0) {
+ xids.insert(std::string((const char*)xidbuff, xidbuffSize));
+ preparedMap[std::string((const char*)xidbuff, xidbuffSize)] =
dtokp.rid();
+ ::free(xidbuff);
+ } else {
+ THROW_STORE_EXCEPTION("No XID found in
BdbMessageStore::collectPreparedXids()");
+ }
+ aio_sleep_cnt = 0;
+ break;
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in
BdbMessageStore::collectPreparedXids()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break;
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Prepared XID journal:
collectPreparedXids() failed: ") + e.what());
+ }
preparedXidStorePtr->recover_complete(); // start journal.
}
@@ -1148,9 +1182,13 @@
try {
// Nothing to do if not prepared
chkInitPreparedXidStore();
- if (txn.getDtok().is_enqueued())
- preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(),
txn.getXid());
-
+ if (txn.getDtok()->is_enqueued()) {
+ txn.incrDtokRef();
+ DataTokenImpl* dtokp = txn.getDtok();
+ dtokp->set_dequeue_rid(dtokp->rid());
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(),
txn.getXid());
+ }
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() <<
": " << e.what());
@@ -1162,19 +1200,15 @@
{
checkInit();
// pass sequence number for c/a
- TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- return auto_ptr<TransactionContext>(txn);
+ return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
}
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const
std::string& xid)
{
checkInit();
- IdSequence* jtx = NULL;
- jtx = &messageIdSequence;
-
+ IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
- TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- return auto_ptr<TPCTransactionContext>(txn);
+ return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
}
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
@@ -1185,7 +1219,11 @@
try {
chkInitPreparedXidStore();
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(),
txn->getXid(), false);
+ txn->incrDtokRef();
+ DataTokenImpl* dtokp = txn->getDtok();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(messageIdSequence.next());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(),
false);
txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-05 20:14:45 UTC (rev 2177)
@@ -25,29 +25,19 @@
#define _BdbMessageStore_
#include <string>
+
#include "db-inc.h"
-//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-//#include "IdSequence.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jcfg.hpp"
#include "PreparedTransaction.h"
-//#include "StoreException.h"
-#include "TxnCtxt.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/management/Manageable.h"
-//#include <qpid/sys/Monitor.h>
-//#include <qpid/sys/Time.h>
-//#include <map>
-//#include <set>
-//#include <iostream>
-//#include <boost/format.hpp>
-//#include <boost/intrusive_ptr.hpp>
-//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
-#include "jrnl/jcfg.hpp"
-#include "JournalImpl.h"
-#include "IdSequence.h"
+#include "TxnCtxt.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -57,7 +47,6 @@
namespace rhm {
namespace bdbstore {
-using std::string;
/**
* An implementation of the MessageStore interface based on Berkeley DB
@@ -70,6 +59,9 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
+
+ typedef std::map<std::string, u_int64_t> xid_rid_map;
+ typedef xid_rid_map::const_iterator xid_rid_map_citr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -98,6 +90,8 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
+ xid_rid_map preparedMap;
+ u_int64_t highestRid;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -121,7 +115,6 @@
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index, txn_list& locked, message_index&
prepared);
void recoverXids(txn_list& txns);
- void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
@@ -152,21 +145,34 @@
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- string getJrnlBaseDir();
- string getBdbBaseDir();
- string getPxidBaseDir();
+ std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for
exmaple /var/rhm/ + queueDir/
+ std::string getJrnlDir(const char* queueName);
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs,
defWCachePageSize); isInit = true;
}
void chkInitPreparedXidStore();
+ // debug aid for printing XIDs that may contain non-printable chars
+ static std::string xid2str(const std::string xid) {
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ for (unsigned i=0; i<xid.size(); i++) {
+ if (isprint(xid[i]))
+ oss << xid[i];
+ else
+ oss << "/" << std::setw(2) <<
(int)((char)xid[i]);
+ }
+ return oss.str();
+ }
+
public:
struct Options : public qpid::Options {
Options(const std::string& name="Store Options");
- string clusterName;
- string storeDir;
+ std::string clusterName;
+ std::string storeDir;
bool storeAsync;
bool storeForce;
uint16_t numJrnlFiles;
@@ -222,7 +228,7 @@
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
- void collectPreparedXids(std::set<string>& xids);
+ void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string&
xid);
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-05 20:14:45 UTC (rev 2177)
@@ -57,7 +57,7 @@
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
- DataTokenImpl dtok;
+ boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
/**
@@ -83,7 +83,6 @@
jc->txn_abort(dtokp.get(), getXid());
}
} catch (const journal::jexception& e) {
- //std::cout << "Error commit" << e <<
std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") +
e.what());
}
}
@@ -93,7 +92,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl),
txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -129,7 +128,6 @@
jc->get_wr_events();
}
} catch (const journal::jexception& e) {
- //std::cout << " TxnCtxt: Error sync 3" << e
<< std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") +
e.what());
}
}
@@ -165,7 +163,14 @@
void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
impactedQueues.insert(queue); }
void complete(bool commit) { completeTXN(commit); }
- DataTokenImpl& getDtok() { return dtok; }
+ DataTokenImpl* getDtok() { return dtokp.get(); }
+ void incrDtokRef() { dtokp->addRef(); }
+ void recoverDtok(const u_int64_t rid, const std::string xid) {
+ dtokp->set_rid(rid);
+ dtokp->set_wstate(DataTokenImpl::ENQ);
+ dtokp->set_xid(xid);
+ dtokp->set_external_rid(true);
+ }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -30,6 +30,7 @@
#include "jrnl/data_tok.hpp"
+#include <iomanip>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
#include <sstream>
@@ -50,6 +51,7 @@
_dblks_written(0),
_dblks_read(0),
_pg_cnt(0),
+ _fid(0),
_rid(0),
_xid(),
_dequeue_rid(0),
@@ -163,9 +165,31 @@
_dblks_written = 0;
_dblks_read = 0;
_pg_cnt = 0;
+ _fid = 0;
_rid = 0;
_xid.clear();
}
+// debug aid
+std::string
+data_tok::status_str() const
+{
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "dtok id=0x" << _cnt << "; ws=" <<
wstate_str() << "; rs=" << rstate_str();
+ oss << "; fid=0x" << _fid << "; rid=0x"
<< _rid << "; xid=";
+ for (unsigned i=0; i<_xid.size(); i++)
+ {
+ if (isprint(_xid[i]))
+ oss << _xid[i];
+ else
+ oss << "/" << std::setw(2) <<
(int)((char)_xid[i]);
+ }
+ oss << "; drid=0x" << _dequeue_rid << "
extrid=" << (_external_rid?"T":"F");
+ oss << "; ds=0x" << _dsize << "; dw=0x"
<< _dblks_written << "; dr=0x" << _dblks_read;
+ oss << " pc=0x" << _pg_cnt;
+ return oss.str();
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -160,6 +160,9 @@
{ _xid.assign((const char*)xidp, xid_len); }
void reset();
+
+ // debug aid
+ std::string status_str() const;
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -97,7 +97,7 @@
return _fh_arr[pg_index];
}
-const std::string
+std::string
rrfc::status_str() const
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -136,7 +136,7 @@
{ return _curr_fh->wr_aio_outstanding_dblks() > 0; }
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -91,11 +91,8 @@
const txn_data_list
txn_map::get_tdata_list(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -127,11 +124,8 @@
bool
txn_map::in_map(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr= _map.find(xid);
if (itr == _map.end()) // not found in map
return false;
return true;
@@ -140,11 +134,8 @@
u_int32_t
txn_map::get_rid_count(const std::string& xid)
{
- xmap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(xid);
- }
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -176,7 +176,7 @@
return findex != _fh_index && in_use;
}
-const std::string
+std::string
wrfc::status_str() const
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -128,7 +128,7 @@
bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
// Debug aid
- const std::string status_str() const;
+ std::string status_str() const;
}; // class wrfc
} // namespace journal
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-05 20:14:45 UTC (rev 2177)
@@ -160,9 +160,6 @@
swap.check(commit);
restart();
swap.check(commit);
-
- // this test leaves xids in the store
- store->truncate();
}
void commit(Strategy& strategy)
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-07-03 20:46:46 UTC (rev 2176)
+++ store/trunk/cpp/tests/persistence.py 2008-07-05 20:14:45 UTC (rev 2177)
@@ -262,6 +262,19 @@
session = self.session
session.synchronous = False
+ # check xids from phase 6 are gone
+ txc = self.xid('c')
+ txd = self.xid('d')
+
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ if txd.global_id in ids:
+ self.fail("Xid still present : %s" % (txc))
+ self.assertEqual(0, len(xids))
+
#test deletion of queue after publish
#create queue
session.queue_declare(queue = "q", auto_delete=True, durable=True)