rhmessaging commits: r2018 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 17:49:39 -0400 (Thu, 08 May 2008)
New Revision: 2018
Modified:
mgmt/cumin/python/cumin/test.py
Log:
Improve the logic that checks for expected redirects
Modified: mgmt/cumin/python/cumin/test.py
===================================================================
--- mgmt/cumin/python/cumin/test.py 2008-05-08 21:26:46 UTC (rev 2017)
+++ mgmt/cumin/python/cumin/test.py 2008-05-08 21:49:39 UTC (rev 2018)
@@ -211,12 +211,14 @@
redirect = p.get_redirect_url(s)
- if redirect:
- p, s = self.env.page_and_session()
- s.unmarshal(redirect)
- else:
+ if redirect is None:
+ print form.name_errors.get(s), form.addr_errors.get(s)
+
raise Exception("Expected redirect")
+ p, s = self.env.page_and_session()
+ s.unmarshal(redirect)
+
p.process(s)
p.render(s)
@@ -256,12 +258,12 @@
redirect = p.get_redirect_url(s)
- if redirect:
- p, s = self.env.page_and_session()
- s.unmarshal(redirect)
- else:
+ if redirect is None:
raise Exception("Expected redirect")
+ p, s = self.env.page_and_session()
+ s.unmarshal(redirect)
+
p.process(s)
p.render(s)
@@ -285,12 +287,12 @@
redirect = p.get_redirect_url(s)
- if redirect:
- p, s = self.env.page_and_session()
- s.unmarshal(redirect)
- else:
+ if redirect is None:
raise Exception("Expected redirect")
+ p, s = self.env.page_and_session()
+ s.unmarshal(redirect)
+
class Remove(Test):
def do_run(self, session):
p, s = self.env.page_and_session()
@@ -303,12 +305,12 @@
redirect = p.get_redirect_url(s)
- if redirect:
- p, s = self.env.page_and_session()
- s.unmarshal(redirect)
- else:
+ if redirect is None:
raise Exception("Expected redirect")
+ p, s = self.env.page_and_session()
+ s.unmarshal(redirect)
+
class BrokerTest(Test):
def __init__(self, env, parent):
super(BrokerTest, self).__init__(env, parent)
16 years, 8 months
rhmessaging commits: r2017 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 17:26:46 -0400 (Thu, 08 May 2008)
New Revision: 2017
Modified:
mgmt/cumin/python/cumin/quirk.py
Log:
Can't use connection if you don't start it
Modified: mgmt/cumin/python/cumin/quirk.py
===================================================================
--- mgmt/cumin/python/cumin/quirk.py 2008-05-08 20:58:02 UTC (rev 2016)
+++ mgmt/cumin/python/cumin/quirk.py 2008-05-08 21:26:46 UTC (rev 2017)
@@ -96,6 +96,7 @@
sock = qpid.util.connect(self.host, self.port)
self.pconn = qpid.connection.Connection(sock, self.spec)
+ self.pconn.start()
def close(self):
assert self.pconn
16 years, 8 months
rhmessaging commits: r2016 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 16:58:02 -0400 (Thu, 08 May 2008)
New Revision: 2016
Modified:
mgmt/cumin/python/cumin/client.strings
Log:
Hide the session detach action
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2008-05-08 20:21:27 UTC (rev 2015)
+++ mgmt/cumin/python/cumin/client.strings 2008-05-08 20:58:02 UTC (rev 2016)
@@ -148,7 +148,6 @@
<div class="sactions">
<h2>Act on Selected Sessions:</h2>
- {detach}
{close}
</div>
16 years, 8 months
rhmessaging commits: r2015 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 16:21:27 -0400 (Thu, 08 May 2008)
New Revision: 2015
Modified:
mgmt/cumin/python/cumin/widgets.py
Log:
The UniqueNameField was failing to invoke the do_validate of its super
class.
Modified: mgmt/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/cumin/python/cumin/widgets.py 2008-05-08 20:08:31 UTC (rev 2014)
+++ mgmt/cumin/python/cumin/widgets.py 2008-05-08 20:21:27 UTC (rev 2015)
@@ -545,19 +545,22 @@
self.__object = attr
def do_validate(self, session, errors):
- name = self.get(session)
+ super(UniqueNameField, self).do_validate(session, errors)
- args = {self.__field: name, }
- results = self.__class.selectBy(**args)
+ if not errors:
+ name = self.get(session)
- if self.__object:
- object = self.__object.get(session)
- if object:
- results = results.filter(self.__class.q.id != object.id)
+ args = {self.__field: name, }
+ results = self.__class.selectBy(**args)
- if results.count() > 0:
- errors.append(DuplicateValueError())
+ if self.__object:
+ object = self.__object.get(session)
+ if object:
+ results = results.filter(self.__class.q.id != object.id)
+ if results.count() > 0:
+ errors.append(DuplicateValueError())
+
class DuplicateValueError(Error):
def __init__(self, fld="name"):
super(Error, self).__init__()
16 years, 8 months
rhmessaging commits: r2014 - in mgmt/cumin/python: wooly and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 16:08:31 -0400 (Thu, 08 May 2008)
New Revision: 2014
Modified:
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/wooly/__init__.py
Log:
Because the API could be confusing, get rid of the return value for
Page.pop_current_frame.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2008-05-08 20:04:06 UTC (rev 2013)
+++ mgmt/cumin/python/cumin/broker.py 2008-05-08 20:08:31 UTC (rev 2014)
@@ -573,8 +573,8 @@
class BrokerSetAdd(BrokerSetForm):
def process_cancel(self, session):
branch = session.branch()
- frame = self.page.pop_current_frame(branch)
- frame.show_view(branch)
+ self.page.pop_current_frame(branch)
+ self.page.get_current_frame(branch).show_view(branch)
self.page.set_redirect_url(session, branch.marshal())
def process_submit(self, session):
Modified: mgmt/cumin/python/wooly/__init__.py
===================================================================
--- mgmt/cumin/python/wooly/__init__.py 2008-05-08 20:04:06 UTC (rev 2013)
+++ mgmt/cumin/python/wooly/__init__.py 2008-05-08 20:08:31 UTC (rev 2014)
@@ -328,7 +328,7 @@
def pop_current_frame(self, session):
frame = self.get_current_frame(session)
#print "Popping current frame", frame
- return self.set_current_frame(session, frame.frame)
+ self.set_current_frame(session, frame.frame)
def set_default_frame(self, frame):
self.current_frame.default = frame
16 years, 8 months
rhmessaging commits: r2013 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 16:04:06 -0400 (Thu, 08 May 2008)
New Revision: 2013
Modified:
mgmt/cumin/python/cumin/broker.py
Log:
Rename Peers to Linked Brokers
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2008-05-08 20:02:29 UTC (rev 2012)
+++ mgmt/cumin/python/cumin/broker.py 2008-05-08 20:04:06 UTC (rev 2013)
@@ -139,7 +139,7 @@
def render_title(self, session, vhost):
count = self.get_item_count(session, vhost)
- return "Peers %s" % fmt_count(count)
+ return "Linked Brokers %s" % fmt_count(count)
def render_sql_where(self, session, vhost):
return "where v.id = %(vhost_id)r"
@@ -153,14 +153,14 @@
class FromPeerColumn(NullSortColumn, FreshDataOnlyColumn):
def render_title(self, session, data):
- return "Bytes from Peer"
+ return "Bytes from Broker"
def render_value(self, session, value):
return fmt_rate(value)
class ToPeerColumn(NullSortColumn, FreshDataOnlyColumn):
def render_title(self, session, data):
- return "Bytes to Peer"
+ return "Bytes to Broker"
def render_value(self, session, value):
return fmt_rate(value)
16 years, 8 months
rhmessaging commits: r2012 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 16:02:29 -0400 (Thu, 08 May 2008)
New Revision: 2012
Modified:
mgmt/cumin/python/cumin/broker.strings
Log:
Hide the queue add button
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2008-05-08 19:48:11 UTC (rev 2011)
+++ mgmt/cumin/python/cumin/broker.strings 2008-05-08 20:02:29 UTC (rev 2012)
@@ -102,10 +102,6 @@
</script>
[BrokerQueueTab.html]
-<ul class="actions">
- <li><a class="nav" href="{add_queue_href}">Add Queue</a></li>
-</ul>
-
{items}
[BrokerDetailsTab.html]
16 years, 8 months
rhmessaging commits: r2011 - store/trunk/specs.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-05-08 15:48:11 -0400 (Thu, 08 May 2008)
New Revision: 2011
Modified:
store/trunk/specs/management-schema.xml
Log:
Missing management xml file as part of previous checkin
Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml 2008-05-08 19:44:10 UTC (rev 2010)
+++ store/trunk/specs/management-schema.xml 2008-05-08 19:48:11 UTC (rev 2011)
@@ -43,10 +43,5 @@
<method name="expand" desc="Increase number of files allocated for this journal">
<arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/>
</method>
-
- <method name="reconfigure" desc="Destructively reconfigure dimensions for this journal">
- <arg name="fileCount" type="uint32" dir="I" desc="Number of files in journal"/>
- <arg name="fileSize" type="uint32" dir="I" desc="Size of files in journal"/>
- </method>
</class>
</schema>
16 years, 8 months
rhmessaging commits: r2010 - in store/trunk/cpp: lib/gen/qpid/management and 4 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-05-08 15:44:10 -0400 (Thu, 08 May 2008)
New Revision: 2010
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/_ut_jdir.cpp
store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
store/trunk/cpp/tests/system_test.sh
Log:
BZ445661 "Add mechanism to set write page cache size so that message latency may be controlled". Functionally this works but more robust tests of the extremes still need to be added.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -67,13 +67,14 @@
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
prepareXidDb(&env, 0),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
+ wcache_num_pages(JRNL_WMGR_DEF_PAGES),
isInit(false),
envPath(envpath)
-{
-}
+{}
void BdbMessageStore::initManagement (Broker* broker)
{
@@ -96,12 +97,34 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs)
+bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
{
if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
+
+ // set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
+ wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ switch (wCachePageSize)
+ {
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
+ break;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
+ break;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
+ }
+
useAsync = async;
if (dir.size()>0) storeDir = dir;
@@ -141,6 +164,7 @@
if (!ret) return false;
isInit = true;
+ QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; async=" << (async?"T":"F") << "; force=" << (force?"T":"F") << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
}
@@ -173,8 +197,47 @@
jrnlFsizePgs = jrnlMaxFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
}
+
+ u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
+ switch (jrnlWrCachePageSize)
+ {
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
+ if (oldJrnlWrCachePageSize == 0)
+ {
+ // For zero value, use default
+ jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
+ }
+ else
+ {
+ // For any positive value, use closest value
+ if (oldJrnlWrCachePageSize < 6)
+ jrnlWrCachePageSize = 4;
+ else if (oldJrnlWrCachePageSize < 12)
+ jrnlWrCachePageSize = 8;
+ else if (oldJrnlWrCachePageSize < 24)
+ jrnlWrCachePageSize = 16;
+ else if (oldJrnlWrCachePageSize < 48)
+ jrnlWrCachePageSize = 32;
+ else if (oldJrnlWrCachePageSize < 96)
+ jrnlWrCachePageSize = 64;
+ else if (oldJrnlWrCachePageSize > 128)
+ jrnlWrCachePageSize = 128;
+ QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
+ }
+ }
- return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs);
+ return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
// true is async
@@ -298,7 +361,7 @@
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE);
+ jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": create() failed: " + e.what());
@@ -473,7 +536,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -1486,20 +1549,26 @@
storeAsync(true),
storeForce(false),
numJrnlFiles(8),
- jrnlFsizePgs(24)
+ jrnlFsizePgs(24),
+ wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
- ("store-directory", qpid::optValue(storeDir,"DIR"),
+ ("store-directory", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Must be supplied if --no-data-dir is also used.")
- ("store-async", qpid::optValue(storeAsync,"yes|no"),
- "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
- ("store-force", qpid::optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
+ ("store-async", qpid::optValue(storeAsync, "yes|no"),
+ "Use async persistence storage - if store supports it, enables AIO using O_DIRECT.")
+ ("store-force", qpid::optValue(storeForce, "yes|no"),
+ "Force changing modes of store (from sync to async or visa versa). "
+ "Will delete all existing data if mode is changed.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
"Number of files in persistence journal")
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
"Size of each journal file in multiples of read pages (1 read page = 64kiB)")
+ ("wcache-page-size", qpid::optValue(wCachePageSize, "N"),
+ "Size of the pages in the write page cache in KiB. "
+ "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
+ "Lower values decrease latency at the expense of throughput.")
;
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-05-08 19:44:10 UTC (rev 2010)
@@ -66,6 +66,7 @@
static const bool defForceStoreConversion = false;
static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
std::list<Db*> dbs;
DbEnv env;
@@ -85,6 +86,8 @@
std::string storeDir;
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
+ u_int32_t wcache_pgsize_sblks;
+ u_int16_t wcache_num_pages;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -145,7 +148,7 @@
static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
inline void checkInit() {
- if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs); isInit = true;
+ if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
public:
@@ -157,6 +160,7 @@
bool storeForce;
uint16_t numJrnlFiles;
uint32_t jrnlFsizePgs;
+ uint32_t wCachePageSize;
};
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
@@ -164,7 +168,7 @@
BdbMessageStore(const char* envpath = 0);
virtual ~BdbMessageStore();
bool init(const qpid::Options* options);
- bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs);
+ bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
void initManagement (qpid::broker::Broker* broker);
void truncate();
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -81,8 +81,6 @@
_mgmtObject->set_name(journalId);
_mgmtObject->set_journalDirectory(journalDirectory);
_mgmtObject->set_journalBaseFileName(journalBaseFilename);
- _mgmtObject->set_journalWritePageSize(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_journalWritePages(JRNL_WMGR_PAGES);
_mgmtObject->set_journalReadPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_journalReadPages(JRNL_RMGR_PAGES);
@@ -126,13 +124,20 @@
}
void
-JournalImpl::initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb)
+JournalImpl::initialize(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ const journal::rd_aio_cb rd_cb,
+ const journal::wr_aio_cb wr_cb)
{
std::ostringstream oss;
oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+ oss << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss.str());
- jcntl::initialize(num_jfiles, jfsize_sblks, rd_cb, wr_cb);
+ jcntl::initialize(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb,
+ wr_cb);
log(LOG_DEBUG, "Initialization complete");
if (_mgmtObject.get() != 0)
@@ -140,17 +145,27 @@
_mgmtObject->set_initialFileCount(_num_jfiles);
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_journalCurrentFileCount(_num_jfiles);
+ _mgmtObject->set_journalWritePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_journalWritePages(wcache_num_pages);
}
}
void
-JournalImpl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
- u_int64_t queue_id)
+JournalImpl::recover(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ const journal::rd_aio_cb rd_cb,
+ const journal::wr_aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id)
{
std::ostringstream oss1;
- oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks << " queue_id = 0x" << std::hex << queue_id;
+ oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
+ oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
+ oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
+ oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -159,7 +174,8 @@
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(num_jfiles, jfsize_sblks, rd_cb, wr_cb, prep_xid_list, highest_rid);
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+ prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
@@ -192,6 +208,8 @@
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_journalCurrentFileCount(_num_jfiles);
_mgmtObject->set_journalRecordDepth(_emap.size());
+ _mgmtObject->set_journalWritePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_journalWritePages(wcache_num_pages);
}
}
@@ -447,34 +465,6 @@
}
}
-void
-JournalImpl::reconfigure(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb)
-{
- std::ostringstream oss;
- oss << "Management reconfiguration: num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
- log(LOG_NOTICE, oss.str());
-
- // Make sure something is actually changing before going to all the trouble...
- if (num_jfiles == _num_jfiles && jfsize_sblks == _jfsize_sblks)
- {
- log(LOG_INFO, "Management reconfiguration parameters identical to existing; reconfiguration ignored.");
- return;
- }
-
- stop(true);
- jcntl::initialize(num_jfiles, jfsize_sblks, rd_cb, wr_cb);
- log(LOG_DEBUG, "Management reconfiguration complete");
-
- if (_mgmtObject.get() != 0)
- {
- _mgmtObject->set_initialFileCount(_num_jfiles);
- _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_journalCurrentFileCount(_num_jfiles);
- _mgmtObject->set_journalRecordDepth(0);
- }
-}
-
// static AIO callback fns
void
@@ -506,7 +496,7 @@
}
qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
- qpid::management::Args& args)
+ qpid::management::Args& /*args*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -520,27 +510,6 @@
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
-
- case qpid::management::Journal::METHOD_RECONFIGURE :
- qpid::management::ArgsJournalReconfigure& rArgs =
- dynamic_cast<qpid::management::ArgsJournalReconfigure&>(args);
- // Check parameter validity
- if (rArgs.i_fileCount < JRNL_MIN_NUM_FILES || rArgs.i_fileCount > JRNL_MAX_NUM_FILES)
- {
- // TODO: add text indicating nature of failure
- status = Manageable::STATUS_INVALID_PARAMETER;
- break;
- }
- u_int32_t fsize_sblks = rArgs.i_fileSize * JRNL_RMGR_PAGE_SIZE;
- if (fsize_sblks < JRNL_MIN_FILE_SIZE || fsize_sblks > JRNL_MAX_FILE_SIZE)
- {
- // TODO: add text indicating nature of failure
- status = Manageable::STATUS_INVALID_PARAMETER;
- break;
- }
- reconfigure(rArgs.i_fileCount, fsize_sblks);
- status = Manageable::STATUS_OK;
- break;
}
return status;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-05-08 19:44:10 UTC (rev 2010)
@@ -93,25 +93,43 @@
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout);
+
virtual ~JournalImpl();
- void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb);
+ void initialize(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ const journal::rd_aio_cb rd_cb,
+ const journal::wr_aio_cb wr_cb);
- inline void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks) {
- initialize(num_jfiles, jfsize_sblks, 0, &aio_wr_callback);
+ inline void initialize(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks) {
+ initialize(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, 0,
+ &aio_wr_callback);
}
- void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id);
+ void recover(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ const journal::rd_aio_cb rd_cb,
+ const journal::wr_aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id);
- inline void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id) {
- recover(num_jfiles, jfsize_sblks, 0, &aio_wr_callback, prep_tx_list, highest_rid,
- queue_id);
+ inline void recover(const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id) {
+ recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, 0,
+ &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
}
void recover_complete();
@@ -172,11 +190,7 @@
getEventsTimerSetFlag = true;
}
void handleIoResult(const journal::iores r);
- void reconfigure(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb);
- inline void reconfigure(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks) {
- reconfigure(num_jfiles, jfsize_sblks, 0, &aio_wr_callback);
- }
+
static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
// static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -26,7 +26,6 @@
#include "qpid/management/Manageable.h"
#include "Journal.h"
#include "qpid/management/ArgsJournalExpand.h"
-#include "qpid/management/ArgsJournalReconfigure.h"
using namespace qpid::management;
@@ -37,7 +36,7 @@
string Journal::packageName = string ("mrgstore");
string Journal::className = string ("journal");
uint8_t Journal::md5Sum[16] =
- {0xce,0x1f,0xf,0xaa,0xec,0x9,0x32,0x8,0x4d,0xb1,0x98,0x2e,0x8d,0x5d,0xa4,0xaf};
+ {0xef,0x11,0x33,0x24,0xfb,0x0,0x5e,0x3e,0xe5,0x3c,0x58,0x81,0x1f,0xfb,0x36,0x66};
Journal::Journal (Manageable* _core) :
ManagementObject(_core)
@@ -102,7 +101,7 @@
buf.putBin128 (md5Sum); // Schema Hash
buf.putShort (7); // Config Element Count
buf.putShort (27); // Inst Element Count
- buf.putShort (2); // Method Count
+ buf.putShort (1); // Method Count
buf.putShort (0); // Event Count
// Config Elements
@@ -371,27 +370,7 @@
ft.setString (DESC, "Number of files to increase journal size by");
buf.put (ft);
- ft = FieldTable ();
- ft.setString (NAME, "reconfigure");
- ft.setInt (ARGCOUNT, 2);
- ft.setString (DESC, "Destructively reconfigure dimensions for this journal");
- buf.put (ft);
- ft = FieldTable ();
- ft.setString (NAME, "fileCount");
- ft.setInt (TYPE, TYPE_U32);
- ft.setString (DIR, "I");
- ft.setString (DESC, "Number of files in journal");
- buf.put (ft);
-
- ft = FieldTable ();
- ft.setString (NAME, "fileSize");
- ft.setInt (TYPE, TYPE_U32);
- ft.setString (DIR, "I");
- ft.setString (DESC, "Size of files in journal");
- buf.put (ft);
-
-
// Events
}
@@ -478,17 +457,6 @@
return;
}
- if (methodName == "reconfigure")
- {
- ArgsJournalReconfigure ioArgs;
- ioArgs.i_fileCount = inBuf.getLong ();
- ioArgs.i_fileSize = inBuf.getLong ();
- status = coreObject->ManagementMethod (METHOD_RECONFIGURE, ioArgs);
- outBuf.putLong (status);
- outBuf.putShortString (Manageable::StatusText (status));
- return;
- }
-
outBuf.putLong (status);
outBuf.putShortString (Manageable::StatusText (status));
}
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-05-08 19:44:10 UTC (rev 2010)
@@ -101,7 +101,6 @@
// Method IDs
static const uint32_t METHOD_EXPAND = 1;
- static const uint32_t METHOD_RECONFIGURE = 2;
// Accessor Methods
inline void set_name (std::string val){
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -65,12 +65,11 @@
#define JRNL_MAX_NUM_FILES 64 ///< Max. number of journal files
#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed
-// NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
-#define JRNL_WMGR_PAGE_SIZE 64 ///< Journal write page size in softblocks
-#define JRNL_WMGR_PAGES 32 ///< Number of pages to use in wmgr
+#define JRNL_WMGR_DEF_PAGE_SIZE 64 ///< Journal write page size in softblocks (default)
+#define JRNL_WMGR_DEF_PAGES 32 ///< Number of pages to use in wmgr (default)
#define JRNL_WMGR_MAXDTOKPP 1024 ///< Max. dtoks (data blocks) per page in wmgr
#define JRNL_WMGR_MAXWAITUS 100 ///< Max. wait time (us) before submitting AIO
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -92,7 +92,9 @@
}
void
-jcntl::initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const rd_aio_cb rd_cb, const wr_aio_cb wr_cb)
+jcntl::initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+ const rd_aio_cb rd_cb, const wr_aio_cb wr_cb)
{
_init_flag = false;
_stop_flag = false;
@@ -140,7 +142,8 @@
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
_rmgr.initialize(rd_cb);
- _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+ _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
+ JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
write_infofile();
@@ -149,7 +152,9 @@
}
void
-jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
+ const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
{
_init_flag = false;
@@ -206,7 +211,8 @@
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rd_cb);
- _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
+ _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
+ JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
_readonly_flag = true;
_init_flag = true;
@@ -498,7 +504,8 @@
oss << "errno=" << errno;
throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl", "write_infofile");
}
- jinf ji(_jid, _jdir.dirname(), _base_filename, _num_jfiles, _jfsize_sblks, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, ts);
+ jinf ji(_jid, _jdir.dirname(), _base_filename, _num_jfiles, _jfsize_sblks,
+ _wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts);
ji.write();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -178,12 +178,15 @@
*
* \param num_jfiles The number of journal files to be created.
* \param jfsize_sblks The size of each journal file expressed in softblocks.
+ * \param wcache_num_pages The number of write cache pages to create.
+ * \param wcache_pgsize_sblks The size in sblks of each write cache page.
* \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
*
* \exception TODO
*/
void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb);
/**
@@ -203,6 +206,8 @@
*
* \param num_jfiles The number of journal files to be created.
* \param jfsize_sblks The size of each journal file expressed in softblocks.
+ * \param wcache_num_pages The number of write cache pages to create.
+ * \param wcache_pgsize_sblks The size in sblks of each write cache page.
* \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
* \param prep_txn_list
@@ -211,6 +216,7 @@
* \exception TODO
*/
void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -70,7 +70,7 @@
jinf::jinf(const std::string& jid, const std::string& jdir, const std::string& base_filename,
const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const u_int32_t wcache_pgsize_sblks, const u_int32_t wcache_num_pages, const timespec& ts):
+ const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages, const timespec& ts):
_jver(RHM_JDAT_VERSION),
_jid(jid),
_jdir(jdir),
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -57,9 +57,9 @@
u_int16_t _sblk_size_dblks;
u_int32_t _dblk_size;
u_int32_t _wcache_pgsize_sblks;
- u_int32_t _wcache_num_pages;
+ u_int16_t _wcache_num_pages;
u_int32_t _rcache_pgsize_sblks;
- u_int32_t _rcache_num_pages;
+ u_int16_t _rcache_num_pages;
std::tm* _tm_ptr;
bool _valid_flag;
bool _analyzed_flag;
@@ -74,7 +74,7 @@
// constructor for writing jinf file
jinf(const std::string& jid, const std::string& jdir, const std::string& base_filename,
const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
- const u_int32_t wcache_pgsize_sblks, const u_int32_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages,
const timespec& ts);
virtual ~jinf();
@@ -93,9 +93,9 @@
inline u_int16_t sblk_size_dblks() const { return _sblk_size_dblks; }
inline u_int32_t dblk_size() const { return _dblk_size; }
inline u_int32_t wcache_pgsize_sblks() const { return _wcache_pgsize_sblks; }
- inline u_int32_t wcache_num_pages() const { return _wcache_num_pages; }
+ inline u_int16_t wcache_num_pages() const { return _wcache_num_pages; }
inline u_int32_t rcache_pgsize_sblks() const { return _rcache_pgsize_sblks; }
- inline u_int32_t rcache_num_pages() const { return _rcache_num_pages; }
+ inline u_int16_t rcache_num_pages() const { return _rcache_num_pages; }
u_int16_t get_start_file();
u_int16_t get_end_file();
bool get_initial_owi();
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -34,6 +34,7 @@
#include <cstdlib>
#include <cstring>
#include "jrnl/jcfg.hpp"
+#include "jrnl/jcntl.hpp"
#include "jrnl/jerrno.hpp"
#include <sstream>
@@ -73,10 +74,9 @@
const u_int32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
-pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
- const u_int16_t pages):
- _pagesize(pagesize),
- _pages(pages),
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap):
+ _cache_pgsize_sblks(0),
+ _cache_num_pages(0),
_jc(jc),
_emap(emap),
_tmap(tmap),
@@ -101,42 +101,44 @@
}
void
-pmgr::initialize()
+pmgr::initialize(const u_int32_t cache_pgsize_sblks, const u_int16_t cache_num_pages)
{
// As static use of this class keeps old values around, clean up first...
+ pmgr::clean();
_pg_index = 0;
_pg_cntr = 0;
_pg_offset_dblks = 0;
_aio_evt_rem = 0;
- pmgr::clean();
+ _cache_pgsize_sblks = cache_pgsize_sblks;
+ _cache_num_pages = cache_num_pages;
// 1. Allocate page memory (as a single block)
- std::size_t pagesize = _pages * _pagesize * _sblksize;
- if (::posix_memalign(&_page_base_ptr, _sblksize, pagesize))
+ std::size_t cache_pgsize = _cache_num_pages * _cache_pgsize_sblks * _sblksize;
+ if (::posix_memalign(&_page_base_ptr, _sblksize, cache_pgsize))
{
clean();
std::ostringstream oss;
- oss << "posix_memalign(): blksize=" << _sblksize << " size=" << pagesize;
+ oss << "posix_memalign(): blksize=" << _sblksize << " size=" << cache_pgsize;
oss << " errno=" << errno;
throw jexception(jerrno::JERR__MALLOC, oss.str(), "pmgr", "initialize");
}
// 2. Allocate array of page pointers
- _page_ptr_arr = (void**)std::malloc(_pages * sizeof(void*));
+ _page_ptr_arr = (void**)std::malloc(_cache_num_pages * sizeof(void*));
MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
// 3. Allocate and initilaize page control block (page_cb) array
- _page_cb_arr = (page_cb*)std::malloc(_pages * sizeof(page_cb));
+ _page_cb_arr = (page_cb*)std::malloc(_cache_num_pages * sizeof(page_cb));
MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
- std::memset(_page_cb_arr, 0, _pages * sizeof(page_cb));
+ std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
// 5. Allocate IO control block (iocb) array
- _iocb_arr = (iocb*)std::malloc(_pages * sizeof(iocb));
+ _iocb_arr = (iocb*)std::malloc(_cache_num_pages * sizeof(iocb));
MALLOC_CHK(_iocb_arr, "_iocb_arr", "pmgr", "initialize");
// 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
- for (u_int16_t i=0; i<_pages; i++)
+ for (u_int16_t i=0; i<_cache_num_pages; i++)
{
- _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _pagesize * _sblksize * i);
+ _page_ptr_arr[i] = (void*)((char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * i);
_page_cb_arr[i]._index = i;
_page_cb_arr[i]._state = UNUSED;
_page_cb_arr[i]._pbuff = _page_ptr_arr[i];
@@ -145,8 +147,8 @@
_iocb_arr[i].data = (void*)&_page_cb_arr[i];
}
- // 7. Allocate io_event array, max one event per cache page in rmgr and wmgr
- const u_int16_t max_aio_evts = JRNL_RMGR_PAGES + JRNL_WMGR_PAGES;
+ // 7. Allocate io_event array, max one event per cache page plus one for each file
+ const u_int16_t max_aio_evts = _cache_num_pages + _jc->num_jfiles();
_ioevt_arr = (io_event*)std::malloc(max_aio_evts * sizeof(io_event));
MALLOC_CHK(_ioevt_arr, "_ioevt_arr", "pmgr", "initialize");
@@ -172,7 +174,7 @@
if (_page_cb_arr)
{
- for (int i=0; i<_pages; i++)
+ for (int i=0; i<_cache_num_pages; i++)
delete _page_cb_arr[i]._pdtokl;
std::free(_page_ptr_arr);
_page_ptr_arr = 0;
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -57,7 +57,7 @@
/**
* \brief Abstract class for managing either read or write page cache of arbitrary size and
- * number of pages.
+ * number of cache_num_pages.
*/
class pmgr
{
@@ -95,13 +95,13 @@
};
static const u_int32_t _sblksize; ///< Disk softblock size
- const u_int32_t _pagesize; ///< Size of page cache pages
- const u_int16_t _pages; ///< Number of page cache pages
+ u_int32_t _cache_pgsize_sblks; ///< Size of page cache cache_num_pages
+ u_int16_t _cache_num_pages; ///< Number of page cache cache_num_pages
jcntl* _jc; ///< Pointer to journal controller
enq_map& _emap; ///< Ref to enqueue map
txn_map& _tmap; ///< Ref to transaction map
void* _page_base_ptr; ///< Base pointer to page memory
- void** _page_ptr_arr; ///< Array of pointers to pages in page memory
+ void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
page_cb* _page_cb_arr; ///< Array of page_cb structs
iocb* _iocb_arr; ///< Array of iocb structs
io_event* _ioevt_arr; ///< Array of io_events
@@ -116,16 +116,18 @@
txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
public:
- pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
- const u_int16_t pages);
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
virtual ~pmgr();
virtual u_int32_t get_events(page_state state) = 0;
inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
static const char* page_state_str(page_state ps);
+ inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
+ inline u_int16_t cache_num_pages() const { return _cache_num_pages; }
protected:
- virtual void initialize();
+ virtual void initialize(const u_int32_t cache_pgsize_sblks,
+ const u_int16_t cache_num_pages);
virtual void rotate_page() = 0;
virtual void clean();
};
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -43,7 +43,7 @@
{
rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
- pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
+ pmgr(jc, emap, tmap),
_rrfc(rrfc),
_hdr(),
_valid(false),
@@ -254,7 +254,7 @@
rmgr::get_events(page_state state)
{
int ret = 0;
- if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, 0)) < 0)
+ if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
{
if (ret == -EINTR) // No events
return 0;
@@ -334,7 +334,7 @@
if (_valid)
{
_valid = false;
- for (int i=0; i<_pages; i++)
+ for (int i=0; i<_cache_num_pages; i++)
_page_cb_arr[i]._state = UNUSED;
_rrfc.reset();
_pg_offset_dblks = 0;
@@ -344,7 +344,7 @@
void
rmgr::initialize()
{
- pmgr::initialize();
+ pmgr::initialize(JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES);
}
iores
@@ -531,9 +531,9 @@
bool outstanding = false;
// Index must start with current buffer and cycle around so that first
// uninitialized buffer is initialized first
- for (u_int16_t i=_pg_index; i<_pg_index+_pages; i++)
+ for (u_int16_t i=_pg_index; i<_pg_index+_cache_num_pages; i++)
{
- int16_t ci = i % _pages;
+ int16_t ci = i % _cache_num_pages;
switch (_page_cb_arr[ci]._state)
{
case UNUSED:
@@ -586,7 +586,7 @@
u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
if (rd_size)
{
- int16_t pi = (i + first_uninit) % _pages;
+ int16_t pi = (i + first_uninit) % _cache_num_pages;
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
iocb* this_iocb_ptr = &_iocb_arr[pi];
@@ -624,7 +624,7 @@
_pg_offset_dblks = 0;
_pg_cntr++;
}
- if (++_pg_index >= _pages)
+ if (++_pg_index >= _cache_num_pages)
_pg_index = 0;
aio_cycle();
_pg_offset_dblks = 0;
@@ -712,14 +712,14 @@
// Analyze how much of message is available
void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
+ void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
u_int16_t data_start_pg_index = _pg_index;
u_int16_t data_start_pg_index = _pg_index;
- for (u_int16_t i=0; i<_pages; i++)
+ for (u_int16_t i=0; i<_cache_num_pages; i++)
{
- pi = (i + _pg_index) % _pages;
+ pi = (i + _pg_index) % _cache_num_pages;
if (data_ptr >= _page_ptr_arr[pi] &&
- data_ptr < (char*)_page_ptr_arr[pi] + _pagesize * _sblksize)
+ data_ptr < (char*)_page_ptr_arr[pi] + _cache_pgsize_sblks * _sblksize)
data_end_pg_index = pi; // found start page index
}
@@ -727,7 +727,7 @@
u_int16_t last_pg_avail_index;
void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
+ void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
if (data_ptr >= page_end_ptr) // folded, go back to first page...
data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
void* data_end_ptr = (char*)data_ptr + data_size;
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -45,7 +45,7 @@
{
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc):
- pmgr(jc, emap, tmap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES),
+ pmgr(jc, emap, tmap),
_wrfc(wrfc),
_max_dtokpp(0),
_max_io_wait_us(0),
@@ -65,7 +65,7 @@
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us):
- pmgr(jc, emap, tmap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES/* , dtoklp */),
+ pmgr(jc, emap, tmap /* , dtoklp */),
_wrfc(wrfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
@@ -89,7 +89,8 @@
}
void
-wmgr::initialize(const wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
+wmgr::initialize(const wr_aio_cb wr_cb, const u_int32_t wcache_pgsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
std::size_t eo)
{
_enq_busy = false;
@@ -100,14 +101,15 @@
_max_io_wait_us = max_iowait_us;
_cb = wr_cb;
+ initialize(wcache_pgsize_sblks, wcache_num_pages);
+
_jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE;
- _jfsize_pgs = _jc->jfsize_sblks() / JRNL_WMGR_PAGE_SIZE;
+ _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
- initialize();
if (eo)
{
- const u_int32_t wr_pg_size_dblks = JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ const u_int32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE;
u_int32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
@@ -161,11 +163,11 @@
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
- (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -221,7 +223,7 @@
}
// Is the page full? If so, flush.
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -301,11 +303,11 @@
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
- (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -364,7 +366,7 @@
}
// Is the page full? If so, flush.
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -435,11 +437,11 @@
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -505,7 +507,7 @@
}
// Is the page full? If so, flush.
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -576,11 +578,11 @@
bool done = false;
while (!done)
{
- assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -642,7 +644,7 @@
}
// Is the page full? If so, flush.
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
@@ -752,7 +754,7 @@
wmgr::get_events(page_state state)
{
int ret = 0;
- if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, 0)) < 0)
+ if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
{
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
@@ -910,9 +912,9 @@
}
void
-wmgr::initialize()
+wmgr::initialize(const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages)
{
- pmgr::initialize();
+ pmgr::initialize(wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_num_jfiles = _jc->num_jfiles();
if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
@@ -1062,12 +1064,12 @@
wmgr::rotate_page()
{
_page_cb_arr[_pg_index]._state = AIO_PENDING;
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
_pg_offset_dblks = 0;
_pg_cntr++;
}
- if (++_pg_index >= _pages)
+ if (++_pg_index >= _cache_num_pages)
_pg_index = 0;
}
@@ -1098,7 +1100,7 @@
oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F");
oss << " ps=[";
- for (int i=0; i<_pages; i++)
+ for (int i=0; i<_cache_num_pages; i++)
{
switch (_page_cb_arr[i]._state)
{
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -104,8 +104,9 @@
const u_int32_t max_iowait_us);
virtual ~wmgr();
- void initialize(wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
- std::size_t eo = 0);
+ void initialize(wr_aio_cb wr_cb, const u_int32_t wcache_pgsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp,
+ const u_int32_t max_iowait_us, std::size_t eo = 0);
iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external);
@@ -123,7 +124,7 @@
const std::string status_str() const;
private:
- void initialize();
+ void initialize(const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages);
iores pre_write_check(const _op_type op, const data_tok* const dtokp,
const std::size_t xidsize = 0, const std::size_t dsize = 0, const bool external = false)
const;
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -56,7 +56,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, true, 4, 1);
+ store->init(TESTDIR, async, true, 4, 1, 8);
store->truncate();
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -94,7 +94,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, false, 4, 1);
+ store->init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
DtxManager mgr;
mgr.setStore (store.get());
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -83,7 +83,7 @@
void testCreateDelete(bool async)
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -98,7 +98,7 @@
void testEmptyRecover(bool async)
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
QueueRegistry registry;
registry.setStore (&store);
@@ -112,7 +112,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -121,7 +121,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -137,7 +137,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -147,7 +147,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -164,7 +164,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -172,7 +172,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -194,7 +194,7 @@
string data2("hijklmn");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -213,7 +213,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -250,7 +250,7 @@
Uuid messageId(true);
string data("abcdefg");
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -265,7 +265,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -285,7 +285,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
//create & stage a message
@@ -327,7 +327,7 @@
{
//recover
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
QueueRegistry registry;
registry.setStore (&store);
ExchangeRegistry exchanges;
@@ -374,7 +374,7 @@
void testDestroyStagedMessage(bool async)
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -398,7 +398,7 @@
void testDestroyEnqueuedMessage(bool async)
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -430,7 +430,7 @@
args.setString("a", "A");
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -440,7 +440,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -454,7 +454,7 @@
}
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry registry;
recover(store, registry);
@@ -473,7 +473,7 @@
{
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -484,7 +484,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -498,7 +498,7 @@
}
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -533,7 +533,7 @@
FieldTable args;
{
BdbMessageStore store;
- store.init(TESTDIR, async, true, 4, 1);
+ store.init(TESTDIR, async, true, 4, 1, 8);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -548,7 +548,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -564,7 +564,7 @@
}
{
BdbMessageStore store;
- store.init(TESTDIR, async, false, 4, 1);
+ store.init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
QueueRegistry queues;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -58,7 +58,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, true, 4, 1);
+ store->init(TESTDIR, async, true, 4, 1, 8);
store->truncate();
//create two queues:
@@ -82,7 +82,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, false, 4, 1);
+ store->init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
DtxManager mgr;
mgr.setStore (store.get());
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -220,7 +220,7 @@
void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, true, 4, 1);
+ store->init(TESTDIR, async, true, 4, 1, 8);
store->truncate();
//create two queues:
@@ -246,7 +246,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init(TESTDIR, async, false, 4, 1);
+ store->init(TESTDIR, async, false, 4, 1, 8);
ExchangeRegistry exchanges;
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -66,10 +66,12 @@
jcntl(jid, jdir, base_filename) {}
virtual ~test_jrnl() {}
void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks)
- { jcntl::initialize(num_jfiles, jfsize_sblks, 0, &aio_wr_callback); }
+ { jcntl::initialize(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
+ 0, &aio_wr_callback); }
void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>& txn_list,
u_int64_t& highest_rid)
- { jcntl::recover(num_jfiles, jfsize_sblks, 0, &aio_wr_callback, txn_list, highest_rid); }
+ { jcntl::recover(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, 0,
+ &aio_wr_callback, txn_list, highest_rid); }
private:
static void aio_wr_callback(jcntl*, std::vector<data_tok*>& dtokl)
{
Modified: store/trunk/cpp/tests/jrnl/_ut_jdir.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_jdir.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/_ut_jdir.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -87,7 +87,7 @@
timespec ts;
::clock_gettime(CLOCK_REALTIME, &ts);
jinf ji("test journal id", dirname, base_filename, NUM_JFILES, JFSIZE_SBLKS,
- JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, ts);
+ JRNL_WMGR_DEF_PAGE_SIZE, JRNL_WMGR_DEF_PAGES, ts);
ji.write();
}
Modified: store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_jinf.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/_ut_jinf.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -145,8 +145,8 @@
{
cout << test_filename << ".write_constructor: " << flush;
::clock_gettime(CLOCK_REALTIME, &ts);
- jinf ji(jid, jdir, base_filename, NUM_JFILES, JFSIZE_SBLKS, JRNL_WMGR_PAGE_SIZE,
- JRNL_WMGR_PAGES, ts);
+ jinf ji(jid, jdir, base_filename, NUM_JFILES, JFSIZE_SBLKS, JRNL_WMGR_DEF_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGES, ts);
BOOST_CHECK_EQUAL(ji.jver(), RHM_JDAT_VERSION);
BOOST_CHECK(ji.jid().compare(jid) == 0);
BOOST_CHECK(ji.jdir().compare(jdir) == 0);
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -30,12 +30,15 @@
jrnl_init_params::jrnl_init_params(const std::string& jid, const std::string& jdir,
const std::string& base_filename, const u_int16_t num_jfiles,
- const u_int32_t jfsize_sblks):
+ const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks):
_jid(jid),
_jdir(jdir),
_base_filename(base_filename),
_num_jfiles(num_jfiles),
- _jfsize_sblks(jfsize_sblks)
+ _jfsize_sblks(jfsize_sblks),
+ _wcache_num_pages(wcache_num_pages),
+ _wcache_pgsize_sblks(wcache_pgsize_sblks)
{}
jrnl_init_params::jrnl_init_params(const jrnl_init_params& jp):
@@ -43,7 +46,9 @@
_jdir(jp._jdir),
_base_filename(jp._base_filename),
_num_jfiles(jp._num_jfiles),
- _jfsize_sblks(jp._jfsize_sblks)
+ _jfsize_sblks(jp._jfsize_sblks),
+ _wcache_num_pages(jp._wcache_num_pages),
+ _wcache_pgsize_sblks(jp._wcache_pgsize_sblks)
{}
jrnl_init_params::jrnl_init_params(const jrnl_init_params* const jp_ptr):
@@ -51,13 +56,17 @@
_jdir(jp_ptr->_jdir),
_base_filename(jp_ptr->_base_filename),
_num_jfiles(jp_ptr->_num_jfiles),
- _jfsize_sblks(jp_ptr->_jfsize_sblks)
+ _jfsize_sblks(jp_ptr->_jfsize_sblks),
+ _wcache_num_pages(jp_ptr->_wcache_num_pages),
+ _wcache_pgsize_sblks(jp_ptr->_wcache_pgsize_sblks)
{}
// static initializers
const u_int16_t jrnl_init_params::def_num_jfiles = 8;
const u_int32_t jrnl_init_params::def_jfsize_sblks = 0xc00;
+const u_int16_t jrnl_init_params::def_wcache_num_pages = 32;
+const u_int32_t jrnl_init_params::def_wcache_pgsize_sblks = 64;
} // namespace jtt
} // namespace rhm
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_init_params.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -38,6 +38,8 @@
public:
static const u_int16_t def_num_jfiles;
static const u_int32_t def_jfsize_sblks;
+ static const u_int16_t def_wcache_num_pages;
+ static const u_int32_t def_wcache_pgsize_sblks;
typedef boost::shared_ptr<jrnl_init_params> shared_ptr;
@@ -47,11 +49,15 @@
std::string _base_filename;
u_int16_t _num_jfiles;
u_int32_t _jfsize_sblks;
+ u_int16_t _wcache_num_pages;
+ u_int32_t _wcache_pgsize_sblks;
public:
jrnl_init_params(const std::string& jid, const std::string& jdir,
const std::string& base_filename, const u_int16_t num_jfiles = def_num_jfiles,
- const u_int32_t jfsize_sblks = def_jfsize_sblks);
+ const u_int32_t jfsize_sblks = def_jfsize_sblks,
+ const u_int16_t wcache_num_pages = def_wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks = def_wcache_pgsize_sblks);
jrnl_init_params(const jrnl_init_params& jp);
jrnl_init_params(const jrnl_init_params* const jp_ptr);
@@ -60,6 +66,8 @@
inline const std::string& base_filename() const { return _base_filename; }
inline u_int16_t num_jfiles() const { return _num_jfiles; }
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+ inline u_int16_t wcache_num_pages() const { return _wcache_num_pages; }
+ inline u_int32_t wcache_pgsize_sblks() const { return _wcache_pgsize_sblks; }
};
} // namespace jtt
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -38,9 +38,11 @@
{
jrnl_instance::jrnl_instance(const std::string& jid, const std::string& jdir,
- const std::string& base_filename, const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+ const std::string& base_filename, const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks):
rhm::journal::jcntl(jid, jdir, base_filename),
- _jpp(new jrnl_init_params(jid, jdir, base_filename, num_jfiles, jfsize_sblks)),
+ _jpp(new jrnl_init_params(jid, jdir, base_filename, num_jfiles, jfsize_sblks,
+ wcache_num_pages, wcache_pgsize_sblks)),
_args_ptr(0),
_dtok_master_enq_list(),
_dtok_master_txn_list(),
@@ -109,21 +111,23 @@
{
std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
- recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(), aio_rd_callback, aio_wr_callback,
+ recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
+ _jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback,
prep_txn_list, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)
{
if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
- initialize(_jpp->num_jfiles(), _jpp->jfsize_sblks(), aio_rd_callback,
- aio_wr_callback);
+ initialize(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
+ _jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback);
else
throw;
}
}
else
- initialize(_jpp->num_jfiles(), _jpp->jfsize_sblks(), aio_rd_callback, aio_wr_callback);
+ initialize(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
+ _jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback);
}
catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-05-08 19:44:10 UTC (rev 2010)
@@ -72,7 +72,9 @@
jrnl_instance(const std::string& jid, const std::string& jdir,
const std::string& base_filename,
const u_int16_t num_jfiles = jrnl_init_params::def_num_jfiles,
- const u_int32_t jfsize_sblks = jrnl_init_params::def_jfsize_sblks);
+ const u_int32_t jfsize_sblks = jrnl_init_params::def_jfsize_sblks,
+ const u_int16_t wcache_num_pages = jrnl_init_params::def_wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks = jrnl_init_params::def_wcache_pgsize_sblks);
jrnl_instance(const jrnl_init_params::shared_ptr& params);
virtual ~jrnl_instance();
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-05-08 19:26:23 UTC (rev 2009)
+++ store/trunk/cpp/tests/system_test.sh 2008-05-08 19:44:10 UTC (rev 2010)
@@ -36,9 +36,10 @@
fail=0
# Run the tests with a given set of flags
+BROKER_OPTS="--load-module=$LIBBDBSTORE --data-dir=$TMPDIR --auth=no --store-force=yes --wcache-page-size 16"
run_tests() {
for p in `seq 1 8`; do
- $abs_srcdir/start_broker "$@" --load-module=$LIBBDBSTORE --data-dir=$TMPDIR --auth=no --store-force=yes || return 1
+ $abs_srcdir/start_broker "$@" ${BROKER_OPTS} || return 1
python "$abs_srcdir/persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1;
$abs_srcdir/stop_broker
done
16 years, 8 months
rhmessaging commits: r2009 - in mgmt/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-08 15:26:23 -0400 (Thu, 08 May 2008)
New Revision: 2009
Modified:
mgmt/cumin/bin/cumin-test
mgmt/cumin/python/cumin/quirk.py
mgmt/cumin/python/cumin/test.py
Log:
Interim checkin of updates to test code for 0-10 python client.
There's still an issue with session close.
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2008-05-08 18:09:50 UTC (rev 2008)
+++ mgmt/cumin/bin/cumin-test 2008-05-08 19:26:23 UTC (rev 2009)
@@ -25,7 +25,7 @@
host, port = parse_broker_addr(config.broker)
- env = TestEnvironment(app, host, port)
+ env = TestEnvironment(app, host, port, config.spec)
env.init();
session = env.run_test(MainTest(env))
Modified: mgmt/cumin/python/cumin/quirk.py
===================================================================
--- mgmt/cumin/python/cumin/quirk.py 2008-05-08 18:09:50 UTC (rev 2008)
+++ mgmt/cumin/python/cumin/quirk.py 2008-05-08 19:26:23 UTC (rev 2009)
@@ -44,7 +44,7 @@
session.psession.message_flow(self.name, 0, 0xFFFFFFFF)
session.psession.message_flow(self.name, 1, 0xFFFFFFFF)
- self.client_queue = session.client.pclient.queue(self.name)
+ self.client_queue = session.client.pconn.queue(self.name)
def get(self, session):
if self.client_queue is None:
@@ -83,22 +83,33 @@
def __str__(self):
return self.content.body
-class Client(object):
- def __init__(self, host, port):
+class Connection(object):
+ def __init__(self, host, port, spec_path):
self.host = host
self.port = port
- self.pclient = qpid.client.Client(host, port)
+ self.spec = qpid.spec.load(spec_path)
- def login(self, user, password):
- self.pclient.start({"LOGIN": user, "PASSWORD": password})
+ self.pconn = None
+ def open(self):
+ assert self.pconn is None
+
+ sock = qpid.util.connect(self.host, self.port)
+ self.pconn = qpid.connection.Connection(sock, self.spec)
+
+ def close(self):
+ assert self.pconn
+
+ self.pconn.close()
+
class Session(object):
- def __init__(self, client):
- self.client = client
- self.psession = client.pclient.session()
+ def __init__(self, conn, name):
+ self.conn = conn
+ self.name = name
+ self.psession = conn.pconn.session(name)
def open(self):
- self.psession.open()
+ pass
def close(self):
self.psession.close()
Modified: mgmt/cumin/python/cumin/test.py
===================================================================
--- mgmt/cumin/python/cumin/test.py 2008-05-08 18:09:50 UTC (rev 2008)
+++ mgmt/cumin/python/cumin/test.py 2008-05-08 19:26:23 UTC (rev 2009)
@@ -9,10 +9,11 @@
import time
class TestEnvironment(object):
- def __init__(self, app, broker_host, broker_port):
+ def __init__(self, app, broker_host, broker_port, spec_path):
self.app = app
- self.broker_client = quirk.Client(broker_host, broker_port)
+ self.broker_conn = quirk.Connection(broker_host, broker_port,
+ spec_path)
self.broker_queue = quirk.Queue("cumin.queue")
self.broker_exchange = quirk.Exchange("cumin.exchange")
@@ -23,9 +24,9 @@
self.exchange = None
def init(self):
- self.broker_client.login("guest", "guest")
+ self.broker_conn.open()
- session = quirk.Session(self.broker_client)
+ session = quirk.Session(self.broker_conn, "test")
session.open()
try:
@@ -201,7 +202,7 @@
form.names.get(s).append(session.id)
addr = "%s:%s" % \
- (self.env.broker_client.host, self.env.broker_client.port)
+ (self.env.broker_conn.host, self.env.broker_conn.port)
form.addrs.get(s).append(addr)
form.groups.get(s).append(None)
form.submit(s)
@@ -375,8 +376,8 @@
raise Exception("Not implemented")
vhost = self.env.vhost
- address = self.env.broker_client.host + ":" + \
- str(self.env.broker_client.port)
+ address = self.env.broker_conn.host + ":" + \
+ str(self.env.broker_conn.port)
self.env.client = Client.selectBy(vhost=vhost, address=address)[0]
16 years, 8 months