[rhmessaging-commits] rhmessaging commits: r964 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Oct 4 15:54:30 EDT 2007


Author: cctrieloff
Date: 2007-10-04 15:54:30 -0400 (Thu, 04 Oct 2007)
New Revision: 964

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/TxnCtxt.h
Log:
code clean up for async tx / dtx

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-04 19:52:33 UTC (rev 963)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-04 19:54:30 UTC (rev 964)
@@ -391,7 +391,7 @@
 	journal::data_tok dtokp;
 	size_t readSize = 0;
 //	char** buff = 0;
-    unsigned aio_sleep_cnt = 0;
+//    unsigned aio_sleep_cnt = 0;
 	unsigned msg_count=0;
     bool read = true;
 
@@ -446,10 +446,10 @@
                     break;
 		        }
                 case rhm::journal::RHM_IORES_AIO_WAIT:
-                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+/*                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
                     {
                         THROW_STORE_EXCEPTION("Store error, disk time out on recover for:" + queue->getName());
-                    }
+                    }*/
                     ::usleep(AIO_SLEEP_TIME);
                     break;
                 case rhm::journal::RHM_IORES_EMPTY:
@@ -792,7 +792,7 @@
 	    dtokp->setSourceMessage (&message);
 	    dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal header (record-id)
 
-            unsigned aio_sleep_cnt = 0;
+//            unsigned aio_sleep_cnt = 0;
             bool written = false;
             while (!written)
             {
@@ -805,13 +805,12 @@
                             written = true;
                         break;
                     case rhm::journal::RHM_IORES_AIO_WAIT:
-                        if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+/*                        if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
 						    delete dtokp;
 			                THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " + queue->getName());
-						}
-                        usleep(AIO_SLEEP_TIME);
+						}*/
+                        usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
                         jc->get_wr_events();
-                        aio_sleep_cnt++;
                         break;
                     case rhm::journal::RHM_IORES_FULL:
 					    delete dtokp;
@@ -892,7 +891,7 @@
 
 void BdbMessageStore::async_dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
 {
-	unsigned aio_sleep_cnt = 0;
+//	unsigned aio_sleep_cnt = 0;
     bool written = false;
 	journal::data_tok* ddtokp =  new journal::data_tok;
  	ddtokp->setSourceMessage (&msg);
@@ -923,13 +922,12 @@
                  written = true;
                  break;
              case rhm::journal::RHM_IORES_AIO_WAIT:
-                 if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+/*                 if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
 				     delete ddtokp;
 			         THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for: " + queue.getName());
-			     }
+			     } */
+                 usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                  jc->get_wr_events();
-                 usleep(AIO_SLEEP_TIME);
-                 aio_sleep_cnt++;
                  break;
              default:
 			     delete ddtokp;
@@ -1017,7 +1015,7 @@
 
 auto_ptr<TransactionContext> BdbMessageStore::begin() 
 {
-    TxnCtxt* txn(new TxnCtxt());
+    TxnCtxt* txn(new TxnCtxt(true));
     txn->begin(env);
     return auto_ptr<TransactionContext>(txn);
 }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-10-04 19:52:33 UTC (rev 963)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-10-04 19:54:30 UTC (rev 964)
@@ -47,6 +47,7 @@
 	ipqdef impactedQueues; // list of Queues used in the txn
     static unsigned int count;
     mutable qpid::sys::Mutex Lock;
+	bool loggedtx;
 	
 	unsigned int getCount() {
     	qpid::sys::Mutex::ScopedLock locker(Lock);
@@ -73,8 +74,8 @@
 	
 public:
 	
-    TxnCtxt() : txn(0) {
-		tid = "rhm-tid" + getCount();
+    TxnCtxt(bool _loggedtx=false) : loggedtx(_loggedtx), txn(0)  {
+		if (loggedtx) tid = "rhm-tid" + getCount();
 	}
 	
 	/**
@@ -83,15 +84,12 @@
 	*@return if the data sucessfully synced.
 	*/	
 	void sync(){
-  		bool allWritten = true;
+  		bool allWritten = false;
 		bool firstloop = true;
-    	unsigned aio_sleep_cnt = 0;
-		while (!allWritten){
-            if (!firstloop) ::usleep(AIO_SLEEP_TIME);
+		while (loggedtx && !allWritten){
+            if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
 			allWritten = true;
-			unsigned qcnt = 0;
 			for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
-				qcnt ++;        
 	           	journal::jcntl* jc = static_cast<journal::jcntl*>((*i)->getExternalQueueStore());
 				if (jc && !(jc->is_txn_synced(getXid())))
 				{
@@ -102,10 +100,6 @@
 				}
 			}
 			firstloop = false;
-			if (++aio_sleep_cnt > MAX_AIO_SLEEPS*qcnt)
-            {
-               	THROW_STORE_EXCEPTION("Store error, disk time out on sync for:" + getXid());
-            }
 		}
 	}
 	




More information about the rhmessaging-commits mailing list