[rhmessaging-commits] rhmessaging commits: r2202 - in store/trunk/cpp: lib and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Jul 16 15:20:44 EDT 2008


Author: kpvdr
Date: 2008-07-16 15:20:44 -0400 (Wed, 16 Jul 2008)
New Revision: 2202

Modified:
   store/trunk/cpp/Makefile.am
   store/trunk/cpp/configure.ac
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/TxnCtxt.h
Log:
Fixed thread sync problem for prepared list initialization; added command-line params for prepared list journal geometry to store module. Tidied up handling of command-line params.

Modified: store/trunk/cpp/Makefile.am
===================================================================
--- store/trunk/cpp/Makefile.am	2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/Makefile.am	2008-07-16 19:20:44 UTC (rev 2202)
@@ -27,4 +27,8 @@
 rpmbuild: $(SPEC) dist-gzip
 	mkdir -p $(RPMDIRS)
 	rpmbuild $(RPMMACROS) $(RPMOPTS) rhm.spec
+if HAS_RPMLINT
 	rpmlint `find rpm -name '*.rpm'`
+else
+	@echo "WARNING: rpmlint not found, could not validate RPMs."
+endif

Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac	2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/configure.ac	2008-07-16 19:20:44 UTC (rev 2202)
@@ -175,6 +175,10 @@
 # We use valgrind for the tests.  See if it's available.
 AC_CHECK_PROG([VALGRIND], [valgrind], [valgrind])
 
+# If rpmlint is available we'll run it when building RPMs.
+AC_CHECK_PROG([RPMLINT], [rpmlint], [rpmlint])
+AM_CONDITIONAL([HAS_RPMLINT], [test -n "$RPMLINT"])
+
 # Also doxygen for documentation...
 AC_CHECK_PROG([do_doxygen], [doxygen], [yes])
 AM_CONDITIONAL([DOXYGEN], [test x$do_doxygen = xyes])

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-07-16 19:20:44 UTC (rev 2202)
@@ -67,17 +67,102 @@
                                  mappingDb(&env, 0),
                                  bindingDb(&env, 0),
                                  generalDb(&env, 0),
-                                 numJrnlFiles(defNumJrnlFiles),
-                                 jrnlFsizePgs(defJrnlFileSizePgs),
-                                 wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
-                                 wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+                                 numJrnlFiles(0),
+                                 jrnlFsizeSblks(0),
+                                 wCachePgSizeSblks(0),
+                                 wCacheNumPages(0),
+                                 tplNumJrnlFiles(0),
+                                 tplJrnlFsizeSblks(0),
+                                 tplWCachePgSizeSblks(0),
+                                 tplWCacheNumPages(0),
                                  highestRid(0),
                                  isInit(false),
                                  envPath(envpath),
                                  mgmtObject(0)
-
 {}
 
+u_int16_t BdbMessageStore::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
+{
+    u_int16_t p = param;
+    if (p < JRNL_MIN_NUM_FILES) {
+        p = JRNL_MIN_NUM_FILES;
+        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value.");
+    } else if (p > JRNL_MAX_NUM_FILES) {
+        p = JRNL_MAX_NUM_FILES;
+        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
+    }
+    return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName)
+{
+    u_int32_t p = param;
+    u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+    u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+    if (p < min) {
+        p = min;
+        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value.");
+    } else if (p > max) {
+        p = max;
+        QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value.");
+    }
+    return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName)
+{
+    u_int32_t p = param;
+    switch (p)
+    {
+      case 1:
+      case 2:
+      case 4:
+      case 8:
+      case 16:
+      case 32:
+      case 64:
+      case 128:
+        break;
+      default:
+        if (p == 0) {
+            // For zero value, use default
+            p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+            QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
+        } else {
+            // For any positive value, use closest value
+            if      (p <   6)   p =   4;
+            else if (p <  12)   p =   8;
+            else if (p <  24)   p =  16;
+            else if (p <  48)   p =  32;
+            else if (p <  96)   p =  64;
+            else if (p > 128)   p = 128;
+            QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
+        }
+    }
+    return p;
+}
+
+u_int16_t BdbMessageStore::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
+{
+    u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+    u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+    switch (wrPageSizeKib)
+    {
+      case 1:
+      case 2:
+      case 4:
+        // 256 KiB total cache
+        return defTotWCacheSize / wrPageSizeSblks / 4;
+      case 8:
+      case 16:
+        // 512 KiB total cache
+        return defTotWCacheSize / wrPageSizeSblks / 2;
+      default: // 32, 64, 128
+        // 1 MiB total cache
+        return defTotWCacheSize / wrPageSizeSblks;
+    }
+}
+
 void BdbMessageStore::initManagement (Broker* broker)
 {
     if (broker != 0) {
@@ -90,44 +175,49 @@
 
             mgmtObject->set_location(storeDir);
             mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
-            mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
+            mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
 
             agent->addObject(mgmtObject, 50, 1);
         }
     }
 }
 
+bool BdbMessageStore::init(const qpid::Options* options)
+{
+    // Extract and check options
+    const Options* opts = static_cast<const Options*>(options);
+    u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
+    u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
+    u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
+    u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
+    u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
+    u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
+    
+    // Pass option values to init(...)
+    return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib);
+}
+
+// These params, taken from options, are assumed to be correct and verified
 bool BdbMessageStore::init(const std::string& dir,
                            u_int16_t jfiles,
                            u_int32_t jfileSizePgs,
-                           uint32_t wCachePageSize)
+                           u_int32_t wCachePageSizeKib,
+                           u_int16_t tplJfiles,
+                           u_int32_t tplJfileSizePgs,
+                           u_int32_t tplWCachePageSizeKib)
 {
     if (isInit) return true;
 
+    // Set geometry members (converting to correct units where req'd)
     numJrnlFiles = jfiles;
-    jrnlFsizePgs = jfileSizePgs;
+    jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+    wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+    wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
+    tplNumJrnlFiles = tplJfiles;
+    tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+    tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+    tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
 
-    // set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
-    wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
-    u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
-    switch (wCachePageSize)
-    {
-      case 1:
-      case 2:
-      case 4:
-        // 256 KiB total cache
-        wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
-        break;
-      case 8:
-      case 16:
-        // 512 KiB total cache
-        wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
-        break;
-      default: // 32, 64, 128
-        // 1 MiB total cache
-        wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
-    }
-
     if (dir.size()>0) storeDir = dir;
 
     journal::jdir::create_dir(getBdbBaseDir());
@@ -167,81 +257,19 @@
     }
 
     isInit = true;
-    QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
+    QPID_LOG(info, "BdbMessageStore module initialized: store-dir=" << dir << "; Default files per journal: " << jfiles <<
+                   "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
     return true;
 }
 
 void BdbMessageStore::chkInitPreparedXidStore()
 {
+    qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
     if (!preparedXidStorePtr->is_ready()) {
-        u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
-        preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+        preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
     }
 }
 
-bool BdbMessageStore::init(const qpid::Options* options)
-{
-    const Options* opts = static_cast<const Options*>(options);
-
-    u_int16_t numJrnlFiles = opts->numJrnlFiles;
-    if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
-        numJrnlFiles = JRNL_MIN_NUM_FILES;
-        QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
-    } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
-        numJrnlFiles = JRNL_MAX_NUM_FILES;
-        QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
-    }
-
-    u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
-    u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
-    u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
-    if (jrnlFsizePgs < jrnlMinFsizePgs) {
-        jrnlFsizePgs = jrnlMinFsizePgs;
-        QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
-    } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
-        jrnlFsizePgs = jrnlMaxFsizePgs;
-        QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
-    }
-
-    u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
-    switch (jrnlWrCachePageSize)
-    {
-      case 1:
-      case 2:
-      case 4:
-      case 8:
-      case 16:
-      case 32:
-      case 64:
-      case 128:
-        break;
-      default:
-        u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
-        if (oldJrnlWrCachePageSize == 0) {
-            // For zero value, use default
-            jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
-            QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
-        } else {
-            // For any positive value, use closest value
-            if (oldJrnlWrCachePageSize < 6)
-                jrnlWrCachePageSize = 4;
-            else if (oldJrnlWrCachePageSize < 12)
-                jrnlWrCachePageSize = 8;
-            else if (oldJrnlWrCachePageSize < 24)
-                jrnlWrCachePageSize = 16;
-            else if (oldJrnlWrCachePageSize < 48)
-                jrnlWrCachePageSize = 32;
-            else if (oldJrnlWrCachePageSize < 96)
-                jrnlWrCachePageSize = 64;
-            else if (oldJrnlWrCachePageSize > 128)
-                jrnlWrCachePageSize = 128;
-            QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
-        }
-    }
-
-    return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
-}
-
 void BdbMessageStore::open(Db& db,
                            DbTxn* txn,
                            const char* file,
@@ -303,16 +331,16 @@
     JournalImpl* jQueue = 0;
     FieldTable::ValuePtr value;
 
-    uint16_t localFileCount = numJrnlFiles;
-    uint32_t localFileSize  = jrnlFsizePgs;
+    u_int16_t localFileCount = numJrnlFiles;
+    u_int32_t localFileSizeSblks  = jrnlFsizeSblks;
 
     value = args.get ("qpid.file_count");
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-        localFileCount = (uint16_t) value->get<int>();
+        localFileCount = (u_int16_t) value->get<int>();
 
     value = args.get ("qpid.file_size");
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
-        localFileSize = (uint32_t) value->get<int>();
+        localFileSizeSblks = (u_int32_t) value->get<int>() * JRNL_RMGR_PAGE_SIZE;
 
     {
         qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
@@ -324,10 +352,9 @@
     queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
     try {
         // init will create the deque's for the init...
-        jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+        jQueue->initialize(localFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
     } catch (const journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
-                              ": create() failed: " + e.what());
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
     }
     try {
         if (!create(queueDb, queueIdSequence, queue)) {
@@ -541,7 +568,7 @@
         try
         {
             u_int64_t thisHighestRid = 0;
-            jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
+            jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
             if (thisHighestRid > highestRid)
                 highestRid = thisHighestRid;
             recoverMessages(txn, registry, queue, prepared, messages);
@@ -735,7 +762,7 @@
                 read = false;
                 break; // done with all messages. (add call in jrnl to test that _emap is empty.)
               default:
-                assert( "Store Error: Unexpected msg state");
+                assert("Store Error: Unexpected msg state");
             } // switch
         } // while
     } catch (const journal::jexception& e) {
@@ -804,9 +831,7 @@
 {
     if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
         u_int64_t thisHighestRid;
-        preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
-                JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
-                0, thisHighestRid, 0);
+        preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
         preparedXidStorePtr->recover_complete(); // start journal.
@@ -1454,21 +1479,32 @@
 
 BdbMessageStore::Options::Options(const std::string& name) :
                                   qpid::Options(name),
-                                  numJrnlFiles(8),
-                                  jrnlFsizePgs(24),
-                                  wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+                                  numJrnlFiles(defNumJrnlFiles),
+                                  jrnlFsizePgs(defJrnlFileSizePgs),
+                                  wCachePageSizeKib(defWCachePageSize),
+                                  tplNumJrnlFiles(defTplNumJrnlFiles),
+                                  tplJrnlFsizePgs(defTplJrnlFileSizePgs),
+                                  tplWCachePageSizeKib(defTplWCachePageSize)
 {
     addOptions()
         ("store-dir", qpid::optValue(storeDir, "DIR"),
          "Store directory location for persistence (instead of using --data-dir value). "
          "Must be supplied if --no-data-dir is also used.")
         ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
-         "Number of files in persistence journal")
+         "Default number of files for each journal instance")
         ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
-         "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
-        ("wcache-page-size", qpid::optValue(wCachePageSize, "N"),
+         "Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
+        ("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
          "Size of the pages in the write page cache in KiB. "
          "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
          "Lower values decrease latency at the expense of throughput.")
+        ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"),
+         "Number of files for transaction prepared list journal instance")
+        ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"),
+         "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64kiB)")
+        ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
+         "Size of the pages in the transaction prepared list write page cache in KiB. "
+         "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
+         "Lower values decrease latency at the expense of throughput.")
         ;
 }

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-07-16 19:20:44 UTC (rev 2202)
@@ -77,9 +77,9 @@
     static const u_int16_t defNumJrnlFiles = 8;
     static const u_int32_t defJrnlFileSizePgs = 24;
     static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
-    static const u_int16_t defXidStoreNumJrnlFiles = 8;
-    static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
-    static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+    static const u_int16_t defTplNumJrnlFiles = 8;
+    static const u_int32_t defTplJrnlFileSizePgs = 24;
+    static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
 
     std::list<Db*> dbs;
     DbEnv env;
@@ -101,9 +101,13 @@
     IdSequence messageIdSequence;
     std::string storeDir;
     u_int16_t numJrnlFiles;
-    u_int32_t jrnlFsizePgs;
-    u_int32_t wcache_pgsize_sblks;
-    u_int16_t wcache_num_pages;
+    u_int32_t jrnlFsizeSblks;
+    u_int32_t wCachePgSizeSblks;
+    u_int16_t wCacheNumPages;
+    u_int16_t tplNumJrnlFiles;
+    u_int32_t tplJrnlFsizeSblks;
+    u_int32_t tplWCachePgSizeSblks;
+    u_int16_t tplWCacheNumPages;
     u_int64_t highestRid;
     bool isInit;
     const char* envPath;
@@ -112,6 +116,15 @@
     qpid::management::Store* mgmtObject;
     qpid::sys::Mutex jrnlCreateLock;
 
+    // Parameter validation and calculation
+    static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
+                                          const std::string paramName);
+    static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
+                                          const std::string paramName);
+    static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
+                                            const std::string paramName);
+    static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
+
     void recoverQueues(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery,
                        queue_index& index,
@@ -128,7 +141,8 @@
                          txn_list& locked,
                          message_index& prepared);
     qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
-                                                                   uint64_t mId, unsigned& headerSize);
+                                                                  uint64_t mId,
+                                                                  unsigned& headerSize);
     void recoverExchanges(TxnCtxt& txn,
                           qpid::broker::RecoveryManager& recovery,
                           exchange_index& index);
@@ -206,7 +220,8 @@
     std::string getBdbBaseDir();
     std::string getPxidBaseDir();
     inline void checkInit() {
-        if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+        // TODO: change the default dir to ~/.qpidd
+        if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
     }
     void chkInitPreparedXidStore();
 
@@ -228,11 +243,12 @@
         Options(const std::string& name="Store Options");
         std::string clusterName;
         std::string storeDir;
-        bool storeAsync;
-        bool storeForce;
-        uint16_t numJrnlFiles;
-        uint32_t jrnlFsizePgs;
-        uint32_t wCachePageSize;
+        u_int16_t numJrnlFiles;
+        u_int32_t jrnlFsizePgs;
+        u_int32_t wCachePageSizeKib;
+        u_int16_t tplNumJrnlFiles;
+        u_int32_t tplJrnlFsizePgs;
+        u_int32_t tplWCachePageSizeKib;
     };
 
     typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
@@ -244,9 +260,12 @@
     bool init(const qpid::Options* options);
 
     bool init(const std::string& dir,
-              u_int16_t jfiles,
-              u_int32_t jfileSizePgs,
-              uint32_t wCachePageSize);
+              u_int16_t jfiles = defNumJrnlFiles,
+              u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+              u_int32_t wCachePageSize = defWCachePageSize,
+              u_int16_t tplJfiles = defTplNumJrnlFiles,
+              u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
+              u_int32_t tplWCachePageSize = defTplWCachePageSize);
 
     void initManagement (qpid::broker::Broker* broker);
 
@@ -318,9 +337,9 @@
     void abort(qpid::broker::TransactionContext& ctxt);
 
     qpid::management::ManagementObject* GetManagementObject (void) const
-    { return mgmtObject; }
+        { return mgmtObject; }
 
-    inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+    inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&)
         { return qpid::management::Manageable::STATUS_OK; }
 }; // class BdbMessageStore
 

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/TxnCtxt.h	2008-07-16 19:20:44 UTC (rev 2202)
@@ -1,24 +1,24 @@
 /*
-    Copyright (C) 2007 Red Hat Software
+  Copyright (C) 2007 Red Hat Software
 
-    This file is part of Red Hat Messaging.
+  This file is part of Red Hat Messaging.
 
-    Red Hat Messaging is free software; you can redistribute it and/or
-    modify it under the terms of the GNU Lesser General Public
-    License as published by the Free Software Foundation; either
-    version 2.1 of the License, or (at your option) any later version.
+  Red Hat Messaging is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
 
-    This library is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-    Lesser General Public License for more details.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
 
-    You should have received a copy of the GNU Lesser General Public
-    License along with this library; if not, write to the Free Software
-    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
-    USA
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+  USA
 
-    The GNU Lesser General Public License is available in the file COPYING.
+  The GNU Lesser General Public License is available in the file COPYING.
 */
 
 #ifndef _TxnCtxt_
@@ -170,7 +170,7 @@
     DbTxn* get() { return txn; }
     virtual bool isTPC() { return false; }
     virtual const std::string& getXid() { return tid; }
-    
+
     void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
     inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
     void complete(bool commit) { completeTxn(commit); }




More information about the rhmessaging-commits mailing list