[rhmessaging-commits] rhmessaging commits: r1270 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Nov 8 13:48:57 EST 2007


Author: cctrieloff
Date: 2007-11-08 13:48:57 -0500 (Thu, 08 Nov 2007)
New Revision: 1270

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
Log:
- full support staging in async
- transient staging not optimized for AIO yet.



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-08 18:48:57 UTC (rev 1270)
@@ -722,7 +722,7 @@
     readXids(prepareXidDb, xids);    
 }
 
-void BdbMessageStore::stage(PersistableMessage& msg)
+void BdbMessageStore::stage( PersistableMessage& msg)
 {
 	checkInit();
     TxnCtxt txn;
@@ -804,13 +804,22 @@
         THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
     }    
 }
-void BdbMessageStore::loadContent(const PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length)
+
+void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
+             const PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length)
 {
 	checkInit();
     u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg.encodedHeaderSize();
     u_int64_t messageId (msg.getPersistenceId());
+	
     if (messageId != 0) {
         try {
+            JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+            if (jc && jc->is_enqueued(messageId) ){
+			    if (jc->loadMsgContent(messageId, data, realOffset, length)){
+		             return;
+			    }
+			}
             Dbt key (&messageId, sizeof(messageId));
             char *buffer = new char[length];
             Dbt value(buffer, length);
@@ -820,15 +829,17 @@
             value.set_dlen(length);
             int status = messageDb.get(0, &key, &value, 0); 
             if (status == DB_NOTFOUND) {
-		delete [] buffer;
+		        delete [] buffer;
                 THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
             } else {
                 data.assign(buffer, value.get_size());
-		delete [] buffer;
+		        delete [] buffer;
             }
         } catch (DbException& e) {
             THROW_STORE_EXCEPTION_2("Error loading content", e);
-        }
+        } catch (journal::jexception& e) {
+            THROW_STORE_EXCEPTION("Error loading content" /*, e*/);
+        }		
     } else {
         THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
     }

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-08 18:48:57 UTC (rev 1270)
@@ -153,7 +153,8 @@
             void stage(qpid::broker::PersistableMessage& msg);
             void destroy(qpid::broker::PersistableMessage& msg);
             void appendContent(const qpid::broker::PersistableMessage& msg, const std::string& data);
-            void loadContent(const qpid::broker::PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length);
+            void loadContent(const qpid::broker::PersistableQueue& queue, const qpid::broker::PersistableMessage& msg, 
+			             std::string& data, u_int64_t offset, u_int32_t length);
 
             void enqueue(qpid::broker::TransactionContext* ctxt, 
                          qpid::broker::PersistableMessage& msg, 

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-11-08 18:48:57 UTC (rev 1270)
@@ -153,14 +153,14 @@
         }
     }
     if (_external)
-        return true;
+        return false;
     if (offset + length > _dlen) {
         std::stringstream ss;
         ss << "loadMsgContent(): offset + length exceeds available message size";
         throw jexception(ss.str());
     }
     data.append((const char*)_datap + offset, length);
-    return false;
+    return true;
 }
 
 const iores

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2007-11-08 18:48:57 UTC (rev 1270)
@@ -268,7 +268,8 @@
             CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
         
             DummyHandler handler;
-            MessageDelivery::deliver(msg, handler, 0, MessageDelivery::getBasicConsumeToken("ignore"), 100); 
+			QueuedMessage qm(queue.get(),msg,0);
+            MessageDelivery::deliver(qm, handler, 0, MessageDelivery::getBasicConsumeToken("ignore"), 100); 
             CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
             AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[2].getBody()));
             CPPUNIT_ASSERT(contentBody);
@@ -314,7 +315,7 @@
     }
 
     void testStagingSync() {testStaging(false);}
-    void testStagingAsync() {std::cout << std::endl << "Missing Async test!!" << std::endl << std:: flush;}
+    void testStagingAsync() {testStaging(true);}
     void testStaging(bool async)
     {
         const string name("MyDurableQueue");
@@ -354,7 +355,8 @@
 
             //load it (without recovery)
             DummyHandler handler;
-            MessageDelivery::deliver(msg, handler, 0, 
+			QueuedMessage qm(&queue, msg, 0);
+            MessageDelivery::deliver(qm, handler, 0, 
                                      MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2 chunks of 20 and one of 12 
             CPPUNIT_ASSERT(handler.frames.size() > 2);
             string loaded;
@@ -394,7 +396,8 @@
 
             //load lazily
             DummyHandler handler;
-            MessageDelivery::deliver(msg, handler, 0, MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2 chunks of 20 and one of 12 
+			QueuedMessage qm(queue.get(),msg,0);
+            MessageDelivery::deliver(qm, handler, 0, MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2 chunks of 20 and one of 12 
 
             CPPUNIT_ASSERT(handler.frames.size() > 2);
             string loaded;
@@ -411,7 +414,7 @@
     }
 
     void testDestroyStagedMessageSync() {testDestroyStagedMessage(false);}
-    void testDestroyStagedMessageAsync() {std::cout << std::endl << "Missing Async test!!" << std::endl << std:: flush;}
+    void testDestroyStagedMessageAsync() {testDestroyStagedMessage(true);}
     void testDestroyStagedMessage(bool async)
     {
         BdbMessageStore store;
@@ -427,14 +430,15 @@
 
         try {
             string loaded;
-            store.loadContent(*msg, loaded, 0, data.length());
+            Queue queue("dummy", 0, &store, 0);
+            store.loadContent(queue, *msg, loaded, 0, data.length());
             CPPUNIT_ASSERT(false);
         } catch (StoreException& e) {
         }
     }
 
     void testDestroyEnqueuedMessageSync() {testDestroyEnqueuedMessage(false);}
-    void testDestroyEnqueuedMessageAsync() {std::cout << std::endl << "Missing Async test!!" << std::endl << std:: flush;}
+    void testDestroyEnqueuedMessageAsync() {testDestroyEnqueuedMessage(true);}
     void testDestroyEnqueuedMessage(bool async)
     {
         BdbMessageStore store;
@@ -452,7 +456,7 @@
         store.destroy(*msg);
 
         string loaded;
-        store.loadContent(*msg, loaded, 0, data.length());
+        store.loadContent(queue, *msg, loaded, 0, data.length());
         CPPUNIT_ASSERT_EQUAL(data, loaded);
 
         store.dequeue(0, *msg, queue);




More information about the rhmessaging-commits mailing list