[rhmessaging-commits] rhmessaging commits: r1258 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 7 10:05:52 EST 2007


Author: cctrieloff
Date: 2007-11-07 10:05:52 -0500 (Wed, 07 Nov 2007)
New Revision: 1258

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
Log:

- staging support
- no staged enqueue, content released, then loaded not yet supported on async
mode



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-07 15:05:52 UTC (rev 1258)
@@ -492,16 +492,22 @@
             {
                 case rhm::journal::RHM_IORES_SUCCESS:{
                     msg_count++;
-	                char* data = (char*)dbuff;
-				    unsigned headerSize = Buffer(data, preambleLength).getLong();
-                    Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
-
-            		RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+	                RecoverableMessage::shared_ptr msg;
+					char* data = (char*)dbuff;
+					
+					unsigned headerSize;
+					if (externalFlag){
+					    msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl 
+					} else {
+				        headerSize = Buffer(data, preambleLength).getLong();
+                        Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+            		    msg = recovery.recoverMessage(headerBuff);
+                    }
             		msg->setPersistenceId(dtokp.rid());
 				 
                     u_int32_t contentOffset = headerSize + preambleLength;
                     u_int64_t contentSize = readSize - contentOffset;
-                    if (msg->loadContent(contentSize)) {
+                    if (msg->loadContent(contentSize) && !externalFlag) {
                          //now read the content
 		    			 Buffer contentBuff(data + contentOffset, contentSize);
                          msg->decodeContent(contentBuff);
@@ -545,6 +551,29 @@
 	
 }
 
+RecoverableMessage::shared_ptr  BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery, 
+     uint64_t messageId, unsigned& headerSize)
+{
+    Dbt key (&messageId, sizeof(messageId));
+    size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+
+    BufferValue value(preamble_length, 0);
+    value.buffer.record();
+    if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
+                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+    }
+	//read header only to begin with
+    headerSize = value.buffer.getLong();
+
+    BufferValue header(headerSize, preamble_length);
+    if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
+                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+    }
+
+     return recovery.recoverMessage(header.buffer);
+} 
+
+
 // bdb version
 void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery, queue_index& index,
                                       txn_list& locked, message_index& prepared)
@@ -700,13 +729,13 @@
     txn.begin(env);
 
     u_int64_t messageId (msg.getPersistenceId());
-    if (messageId == 0) {
+    if (messageId == 0 || !msg.isContentReleased()) {
         try {
             Dbt key (&messageId, sizeof(messageId));
             messageId = messageIdSequence.next();
             store(NULL, &txn, key, msg, true);
             msg.setPersistenceId(messageId);
-            txn.commit();
+			txn.commit();
         } catch (std::exception& e) {
             txn.abort();
             throw e;
@@ -813,8 +842,6 @@
 		JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
 		if (jc){
 		    jc->flush();
-// 			::usleep(10000);  /////////////// hack ----------- FIX!!
-// 			jc->get_wr_events();
 		}
     }catch ( journal::jexception& e) {
        THROW_STORE_EXCEPTION("Flush failed: " + e.to_string() );
@@ -854,6 +881,7 @@
 	    if (usingJrnl()){
 			// add queue* to the txn map..
 			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+			if (msg.isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
 		}else{
        	    msg.enqueueComplete();  // set enqueued for ack
             put(mappingDb, txn->get(), key, value);
@@ -879,10 +907,14 @@
 {
     u_int32_t headerSize = message.encodedHeaderSize();
     u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
-    char* buff = static_cast<char*>(::alloca(size)); // long + headers + content
-    Buffer buffer(buff,size);
-    buffer.putLong(headerSize);
-    message.encode(buffer);
+    char* buff= 0;
+	if (!message.isContentReleased() )
+	{
+	    buff = static_cast<char*>(::alloca(size)); // long + headers + content
+        Buffer buffer(buff,size);
+        buffer.putLong(headerSize);
+        message.encode(buffer);
+	}
 
     try {
 
@@ -898,9 +930,17 @@
 	            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
 				rhm::journal::iores eres;
 				if (txn->getXid().empty()){
-					eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+					if (message.isContentReleased()){
+						eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+					}else {
+						eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+					}
 				}else {
-					eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+					if (message.isContentReleased()){
+					   eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(), false);
+					} else {
+					    eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+					}
 				}
                 switch (eres)
                 {
@@ -963,6 +1003,13 @@
 			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 			async_dequeue(ctxt, msg, queue); 
      	    // added here as we are not doing it async on call back
+			if (msg.isContentReleased())  // TODO remove this code once jrnl is used for transient policy see **
+			{
+			    Dbt key (&messageId, sizeof(messageId));
+                Dbt value (&queueId, sizeof(queueId));
+                dequeue(txn->get(), key, value);
+			}
+			
 			msg.dequeueComplete();
 		    if ( msg.isDequeueComplete()  ) // clear id after last dequeue
 		         msg.setPersistenceId(0);

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-07 15:05:52 UTC (rev 1258)
@@ -88,6 +88,8 @@
             void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, 
 	    			 qpid::broker::RecoverableQueue::shared_ptr& queue, 
                                  txn_list& locked, message_index& prepared);
+            qpid::broker::RecoverableMessage::shared_ptr  getExternMessage(qpid::broker::RecoveryManager& recovery, 
+			                     uint64_t mId, unsigned& headerSize);
             void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
             void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
             int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, 




More information about the rhmessaging-commits mailing list