[rhmessaging-commits] rhmessaging commits: r1361 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Nov 26 16:50:03 EST 2007


Author: kpvdr
Date: 2007-11-26 16:50:03 -0500 (Mon, 26 Nov 2007)
New Revision: 1361

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/jrnl/nlfh.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
Log:
Switched all regular PersistentMessage* and PersistentMessage& to intrusive_ptr<PersistentMessage>, so as to hook into the refcount for a message while it is in the store. Matches changes made in qpid.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-26 21:50:03 UTC (rev 1361)
@@ -35,6 +35,7 @@
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
 using boost::static_pointer_cast;
+using boost::intrusive_ptr;
 
 using std::auto_ptr;
 using std::max;
@@ -719,19 +720,19 @@
     readXids(prepareXidDb, xids);    
 }
 
-void BdbMessageStore::stage( PersistableMessage& msg)
+void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
 {
 	checkInit();
     TxnCtxt txn;
     txn.begin(env);
 
-    u_int64_t messageId (msg.getPersistenceId());
-    if (messageId == 0 || !msg.isContentReleased()) {
+    u_int64_t messageId (msg->getPersistenceId());
+    if (messageId == 0 || !msg->isContentReleased()) {
         try {
             Dbt key (&messageId, sizeof(messageId));
             messageId = messageIdSequence.next();
             store(NULL, &txn, key, msg, true);
-            msg.setPersistenceId(messageId);
+            msg->setPersistenceId(messageId);
 			txn.commit();
         } catch (std::exception& e) {
             txn.abort();
@@ -739,10 +740,10 @@
         }
     }        
 }
-void BdbMessageStore::destroy(PersistableMessage& msg)
+void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
 {
 	checkInit();
-    u_int64_t messageId (msg.getPersistenceId());
+    u_int64_t messageId (msg->getPersistenceId());
     if (messageId) {
         Dbt key (&messageId, sizeof(messageId));
         TxnCtxt txn;
@@ -773,10 +774,10 @@
     return peek.get_size();
 }
 
-void BdbMessageStore::appendContent(const PersistableMessage& msg, const std::string& data)
+void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
 {
 	checkInit();
-    u_int64_t messageId (msg.getPersistenceId());
+    u_int64_t messageId (msg->getPersistenceId());
     if (messageId != 0) {
         try {
             Dbt key (&messageId, sizeof(messageId));
@@ -803,11 +804,11 @@
 }
 
 void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
-             const PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length)
+             intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
 {
 	checkInit();
-    u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg.encodedHeaderSize();
-    u_int64_t messageId (msg.getPersistenceId());
+    u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
+    u_int64_t messageId (msg->getPersistenceId());
 	
     if (messageId != 0) {
         try {
@@ -857,11 +858,12 @@
 	}
 }
 
-void BdbMessageStore::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
+void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+        const PersistableQueue& queue)
 {
 	checkInit();
     u_int64_t queueId (queue.getPersistenceId());
-    u_int64_t messageId (msg.getPersistenceId());
+    u_int64_t messageId (msg->getPersistenceId());
     if (queueId == 0) {
         THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
     }
@@ -882,7 +884,7 @@
         bool newId = false;
         if (messageId == 0) {
             messageId = messageIdSequence.next();
-            msg.setPersistenceId(messageId);
+            msg->setPersistenceId(messageId);
 	        newId = true;
 	    }
         store(&queue, txn, key, msg, newId);
@@ -890,9 +892,9 @@
 	    if (usingJrnl()){
 			// add queue* to the txn map..
 			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-			if (msg.isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
+			if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
 		}else{
-       	    msg.enqueueComplete();  // set enqueued for ack
+       	    msg->enqueueComplete();  // set enqueued for ack
             put(mappingDb, txn->get(), key, value);
         
 			// cct if using Journal do we need to wait for IO to complete before calling thus???
@@ -911,27 +913,28 @@
 
 void BdbMessageStore::store(const PersistableQueue* queue, 
 			TxnCtxt* txn, Dbt& messageId, 
-			PersistableMessage& message, 
+			intrusive_ptr<PersistableMessage>& message, 
 			bool newId)
 {
-    u_int32_t headerSize = message.encodedHeaderSize();
-    u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
+    u_int32_t headerSize = message->encodedHeaderSize();
+    u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
     char* buff= 0;
-	if (!message.isContentReleased() )
+	if (!message->isContentReleased() )
 	{
 	    buff = static_cast<char*>(::alloca(size)); // long + headers + content
         Buffer buffer(buff,size);
         buffer.putLong(headerSize);
-        message.encode(buffer);
+        message->encode(buffer);
 	}
 
     try {
 
      if ( queue && usingJrnl()){
+//std::cout << "E" << std::flush;
         boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
 		dtokp->ref();
-	    dtokp->setSourceMessage (&message);
-	    dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal header (record-id)
+	    dtokp->setSourceMessage (message.get());
+	    dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
 
             bool written = false;
             unsigned aio_sleep_cnt = 0;
@@ -941,13 +944,13 @@
 	            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
 				rhm::journal::iores eres;
 				if (txn->getXid().empty()){
-					if (message.isContentReleased()){
+					if (message->isContentReleased()){
 						eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
 					}else {
 						eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
 					}
 				}else {
-					if (message.isContentReleased()){
+					if (message->isContentReleased()){
 					   eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
 					} else {
 					    eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
@@ -956,18 +959,21 @@
                 switch (eres)
                 {
                     case rhm::journal::RHM_IORES_SUCCESS:
+//std::cout << "." << std::flush;
                         if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
                             written = true;
                         aio_sleep_cnt = 0;
                         busy_sleep_cnt = 0;
                         break;
                     case rhm::journal::RHM_IORES_AIO_WAIT:
+//std::cout << "w" << std::flush;
                         if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
                             THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
                         usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
                         jc->get_wr_events();
                         break;
                    case rhm::journal::RHM_IORES_BUSY:
+//std::cout << "b" << std::flush;
                         if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
                             THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
                         usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
@@ -997,11 +1003,12 @@
     }
 }
 
-void BdbMessageStore::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
+void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+        const PersistableQueue& queue)
 {    
 	checkInit();
     u_int64_t queueId (queue.getPersistenceId());
-    u_int64_t messageId (msg.getPersistenceId());
+    u_int64_t messageId (msg->getPersistenceId());
     if (messageId == 0) {
         THROW_STORE_EXCEPTION("Error dequeing message, persistence id not set");
     }
@@ -1025,16 +1032,16 @@
 			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 			async_dequeue(ctxt, msg, queue); 
      	    // added here as we are not doing it async on call back
-			if (msg.isContentReleased())  // TODO remove this code once jrnl is used for transient policy see **
+			if (msg->isContentReleased())  // TODO remove this code once jrnl is used for transient policy see **
 			{
 			    Dbt key (&messageId, sizeof(messageId));
                 Dbt value (&queueId, sizeof(queueId));
                 dequeue(txn->get(), key, value);
 			}
 			
-			msg.dequeueComplete();
-		    if ( msg.isDequeueComplete()  ) // clear id after last dequeue
-		         msg.setPersistenceId(0);
+			msg->dequeueComplete();
+		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
+		         msg->setPersistenceId(0);
 			
 		} else if (txn->isTPC()) {
             //if this is part of a 2pc transaction, then only record the dequeue now,
@@ -1044,8 +1051,8 @@
             Dbt key (&messageId, sizeof(messageId));
             Dbt value (&queueId, sizeof(queueId));
             if (dequeue(txn->get(), key, value)) {
-                msg.setPersistenceId(0);//clear id as we have now removed the message from the store
-                msg.dequeueComplete(); // set dequeued for ack
+                msg->setPersistenceId(0);//clear id as we have now removed the message from the store
+                msg->dequeueComplete(); // set dequeued for ack
 	        }
         }
         if (!ctxt) txn->commit();
@@ -1059,14 +1066,16 @@
     }   
 }
 
-void BdbMessageStore::async_dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
+        const PersistableQueue& queue)
 {
+//std::cout << "D" << std::flush;
     bool written = false;
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
 	ddtokp->ref();
- 	ddtokp->setSourceMessage (&msg);
+ 	ddtokp->setSourceMessage (msg.get());
 	ddtokp->set_rid(messageIdSequence.next()); 
-	ddtokp->set_dequeue_rid(msg.getPersistenceId());
+	ddtokp->set_dequeue_rid(msg->getPersistenceId());
 	ddtokp->set_wstate(DataTokenImpl::ENQ);
 	JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
 	string tid;
@@ -1092,17 +1101,20 @@
          switch (dres)
          {
              case rhm::journal::RHM_IORES_SUCCESS:
+//std::cout << "." << std::flush;
                  aio_sleep_cnt = 0;
                  busy_sleep_cnt = 0;
                  written = true;
                  break;
              case rhm::journal::RHM_IORES_AIO_WAIT:
+//std::cout << "w" << std::flush;
                  if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
                      THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
                  usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                  jc->get_wr_events();
                  break;
              case rhm::journal::RHM_IORES_BUSY:
+//std::cout << "b" << std::flush;
                  if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
                      THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
                  usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-26 21:50:03 UTC (rev 1361)
@@ -39,6 +39,7 @@
 #include <set>
 #include <iostream>
 #include <boost/format.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/ptr_container/ptr_list.hpp>
 #include "JournalImpl.h"
 #include "DataTokenImpl.h"
@@ -82,31 +83,31 @@
 
             bool mode(const bool mode, const bool force);
             void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
-	    			 txn_list& locked, message_index& messages);
+	    	        txn_list& locked, message_index& messages);
             void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index, 
-                                 txn_list& locked, message_index& prepared);
+                    txn_list& locked, message_index& prepared);
             void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, 
-	    			 qpid::broker::RecoverableQueue::shared_ptr& queue, 
-                                 txn_list& locked, message_index& prepared);
+	    	        qpid::broker::RecoverableQueue::shared_ptr& queue, 
+                    txn_list& locked, message_index& prepared);
             qpid::broker::RecoverableMessage::shared_ptr  getExternMessage(qpid::broker::RecoveryManager& recovery, 
-			                     uint64_t mId, unsigned& headerSize);
+			        uint64_t mId, unsigned& headerSize);
             void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
             void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
             int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, 
-                               queue_index& index, txn_list& locked, message_index& prepared);
+                    queue_index& index, txn_list& locked, message_index& prepared);
             void recoverXids(txn_list& txns);
             void readXids(Db& db, std::set<string>& xids);
             void readLockedMappings(Db& db, txn_lock_map& mappings);
             TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
             void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, 
-	    		Dbt& messageId, 
-			qpid::broker::PersistableMessage& message,
-			bool newId);
+	    	        Dbt& messageId, 
+			        boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+			        bool newId);
             void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
             bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
             void async_dequeue(qpid::broker::TransactionContext* ctxt, 
-	    			qpid::broker::PersistableMessage& msg, 
-				const qpid::broker::PersistableQueue& queue);
+	    	        boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
+			        const qpid::broker::PersistableQueue& queue);
             bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
             bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
             void destroy(Db& db, const qpid::broker::Persistable& p);
@@ -142,26 +143,27 @@
             void destroy(const qpid::broker::PersistableExchange& queue);
 
             void bind(const qpid::broker::PersistableExchange& exchange, 
-                      const qpid::broker::PersistableQueue& queue, 
-                      const std::string& key, const qpid::framing::FieldTable& args);
+                    const qpid::broker::PersistableQueue& queue, 
+                    const std::string& key, const qpid::framing::FieldTable& args);
             void unbind(const qpid::broker::PersistableExchange& exchange, 
-                        const qpid::broker::PersistableQueue& queue, 
-                        const std::string& key, const qpid::framing::FieldTable& args);
+                    const qpid::broker::PersistableQueue& queue, 
+                    const std::string& key, const qpid::framing::FieldTable& args);
 
             void recover(qpid::broker::RecoveryManager& queues);
 
-            void stage(qpid::broker::PersistableMessage& msg);
-            void destroy(qpid::broker::PersistableMessage& msg);
-            void appendContent(const qpid::broker::PersistableMessage& msg, const std::string& data);
-            void loadContent(const qpid::broker::PersistableQueue& queue, const qpid::broker::PersistableMessage& msg, 
-			             std::string& data, u_int64_t offset, u_int32_t length);
+            void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+            void destroy(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+            void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+            void loadContent(const qpid::broker::PersistableQueue& queue,
+                    boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, 
+			        std::string& data, u_int64_t offset, u_int32_t length);
 
             void enqueue(qpid::broker::TransactionContext* ctxt, 
-                         qpid::broker::PersistableMessage& msg, 
-                         const qpid::broker::PersistableQueue& queue);
+                    boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
+                    const qpid::broker::PersistableQueue& queue);
             void dequeue(qpid::broker::TransactionContext* ctxt, 
-                         qpid::broker::PersistableMessage& msg, 
-                         const qpid::broker::PersistableQueue& queue);
+                    boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
+                    const qpid::broker::PersistableQueue& queue);
             void flush(const qpid::broker::PersistableQueue& queue);
 
     	    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);

Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-11-26 21:50:03 UTC (rev 1361)
@@ -209,6 +209,7 @@
             }
             _rec_enqcnt = ro->_enq_cnt_list[_fid];
             return true;
+            //return _fid == ro->_ffid ? ro->_full : true;
         }
     }
 #ifndef RHM_WRONLY

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2007-11-26 21:50:03 UTC (rev 1361)
@@ -40,6 +40,8 @@
 
 using boost::static_pointer_cast;
 using boost::dynamic_pointer_cast;
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
 using namespace qpid;
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -332,16 +334,18 @@
 
             //create & stage a message
             Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey, messageId, (data1.size() + data2.size()));
+            intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+            intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
             msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
             FieldTable table;
             table.setString("abc", "xyz");
             msg->getProperties<MessageProperties>()->setApplicationHeaders(table);
-            store.stage(*msg);
+            store.stage(pmsg);
             
             //append to it
             msg->releaseContent(&store);//ensure that data is not held in memory but is appended to disk when added
-            store.appendContent(*msg, data1);
-            store.appendContent(*msg, data2);
+            store.appendContent(cpmsg, data1);
+            store.appendContent(cpmsg, data2);
 
             //AMQContentBody part1(data1);
             //msg->addContent(&part1);FIXME
@@ -424,15 +428,17 @@
         
         const string data("abcdefg");
         Message::shared_ptr msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
+        intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+        intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
         MessageUtils::addContent(msg, data);
         
-        store.stage(*msg);
-        store.destroy(*msg);
+        store.stage(pmsg);
+        store.destroy(pmsg);
 
         try {
             string loaded;
             Queue queue("dummy", 0, &store, 0);
-            store.loadContent(queue, *msg, loaded, 0, data.length());
+            store.loadContent(queue, cpmsg, loaded, 0, data.length());
             CPPUNIT_ASSERT(false);
         } catch (StoreException& e) {
         }
@@ -448,19 +454,21 @@
         
         const string data("abcdefg");
         Message::shared_ptr msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
+        intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+        intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
         MessageUtils::addContent(msg, data);
 
         Queue queue("my_queue", 0, &store, 0);
         store.create(queue);
         
-        store.enqueue(0, *msg, queue);
-        store.destroy(*msg);
+        store.enqueue(0, pmsg, queue);
+        store.destroy(pmsg);
 
         string loaded;
-        store.loadContent(queue, *msg, loaded, 0, data.length());
+        store.loadContent(queue, cpmsg, loaded, 0, data.length());
         CPPUNIT_ASSERT_EQUAL(data, loaded);
 
-        store.dequeue(0, *msg, queue);
+        store.dequeue(0, pmsg, queue);
         store.destroy(queue);
     }
 




More information about the rhmessaging-commits mailing list