[rhmessaging-commits] rhmessaging commits: r1869 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Apr 8 15:36:31 EDT 2008


Author: tedross
Date: 2008-04-08 15:36:30 -0400 (Tue, 08 Apr 2008)
New Revision: 1869

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/tests/SimpleTest.cpp
Log:
Use updated create interface from QPID to carry arguments for journal creation

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-08 19:36:30 UTC (rev 1869)
@@ -268,7 +268,7 @@
     } 
 }
 
-void BdbMessageStore::create(PersistableQueue& queue)
+void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
 {
     checkInit();
     if (queue.getPersistenceId()) {
@@ -276,16 +276,32 @@
     }
     if (usingJrnl()) {
         JournalImpl* jQueue = 0;
+        FieldTable::ValuePtr value;
+
+        uint16_t localFileCount = numJrnlFiles;
+        uint32_t localFileSize  = jrnlFsizePgs;
+
+        value = args.get ("qpid.file_count");
+        if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+            localFileCount = (uint16_t) value->get<int>();
+
+        value = args.get ("qpid.file_size");
+        if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
+            localFileSize = (uint32_t) value->get<int>();
+
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+            jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
+                                     string("JournalData"), defJournalGetEventsTimeout,
+                                     defJournalFlushTimeout);
         }
         queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
         try	{
             // init will create the deque's for the init...
-            jQueue->initialize(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE);
+            jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE);
         } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
+            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
+                                  ": create() failed: " + e.what());
         }
     }
 
@@ -312,7 +328,7 @@
     }
 }
 
-void BdbMessageStore::create(const PersistableExchange& exchange)
+void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
 {
     checkInit();
     if (exchange.getPersistenceId()) {

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-04-08 19:36:30 UTC (rev 1869)
@@ -169,10 +169,12 @@
 
             void truncate();
 
-            void create(qpid::broker::PersistableQueue& queue);
+            void create(qpid::broker::PersistableQueue& queue,
+                        const qpid::framing::FieldTable& args);
             void destroy(qpid::broker::PersistableQueue& queue);
 
-            void create(const qpid::broker::PersistableExchange& queue);
+            void create(const qpid::broker::PersistableExchange& queue,
+                        const qpid::framing::FieldTable& args);
             void destroy(const qpid::broker::PersistableExchange& queue);
 
             void bind(const qpid::broker::PersistableExchange& exchange, 

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2008-04-08 18:48:17 UTC (rev 1868)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2008-04-08 19:36:30 UTC (rev 1869)
@@ -33,6 +33,7 @@
 #include <qpid/broker/Queue.h>
 #include <qpid/broker/RecoveryManagerImpl.h>
 #include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
 
 #define TESTDIR "/tmp"
 
@@ -86,7 +87,7 @@
     store.truncate();//make sure it is empty to begin with
     string name("CreateDeleteQueue");
     Queue queue(name, 0, &store, 0);
-    store.create(queue);
+    store.create(queue, qpid::framing::FieldTable());
 // TODO - check dir exists
     BOOST_REQUIRE(queue.getPersistenceId());
     store.destroy(queue);
@@ -114,7 +115,7 @@
         store.init(TESTDIR, async, true, 4, 1);
         store.truncate();//make sure it is empty to begin with
         Queue queue(name, 0, &store, 0);
-        store.create(queue);
+        store.create(queue, qpid::framing::FieldTable());
         BOOST_REQUIRE(queue.getPersistenceId());
         id = queue.getPersistenceId();
     }//db will be closed
@@ -166,7 +167,7 @@
         store.init(TESTDIR, async, true, 4, 1);
         store.truncate();//make sure it is empty to begin with
         Queue queue(name, 0, &store, 0);
-        store.create(queue);
+        store.create(queue, qpid::framing::FieldTable());
         store.destroy(queue);
     }//db will be closed
     {
@@ -407,7 +408,7 @@
     MessageUtils::addContent(msg, data);
 
     Queue queue("my_queue", 0, &store, 0);
-    store.create(queue);
+    store.create(queue, qpid::framing::FieldTable());
     
     store.enqueue(0, pmsg, queue);
     store.destroy(*pmsg);
@@ -433,7 +434,7 @@
         store.truncate();//make sure it is empty to begin with
         ExchangeRegistry registry;
         Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
-        store.create(*exchange);
+        store.create(*exchange, qpid::framing::FieldTable());
         id = exchange->getPersistenceId();
         BOOST_REQUIRE(id);
     }//db will be closed
@@ -476,8 +477,8 @@
         store.truncate();//make sure it is empty to begin with
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
-        store.create(*exchange);
-        store.create(*queue);
+        store.create(*exchange, qpid::framing::FieldTable());
+        store.create(*queue, qpid::framing::FieldTable());
         BOOST_REQUIRE(exchange->bind(queue, key, &args));
         store.bind(*exchange, *queue, key, args);
     }//db will be closed
@@ -537,9 +538,9 @@
         Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
         Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
         Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
-        store.create(*exchange);
-        store.create(*queue1);
-        store.create(*queue2);
+        store.create(*exchange, qpid::framing::FieldTable());
+        store.create(*queue1, qpid::framing::FieldTable());
+        store.create(*queue2, qpid::framing::FieldTable());
         store.bind(*exchange, *queue1, key, args);
         store.bind(*exchange, *queue2, key, args);
         //delete queue1:




More information about the rhmessaging-commits mailing list