[rhmessaging-commits] rhmessaging commits: r1437 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Dec 6 13:24:12 EST 2007


Author: aconway
Date: 2007-12-06 13:24:12 -0500 (Thu, 06 Dec 2007)
New Revision: 1437

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/data_tok.cpp
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:

Replaced TimerA with Timer, use intrusive_ptr for Timer.


Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,24 +1,24 @@
 /*
-    Copyright (C) 2007 Red Hat Software
+  Copyright (C) 2007 Red Hat Software
 
-    This file is part of Red Hat Messaging.
+  This file is part of Red Hat Messaging.
     
-    Red Hat Messaging is free software; you can redistribute it and/or
-    modify it under the terms of the GNU Lesser General Public
-    License as published by the Free Software Foundation; either
-    version 2.1 of the License, or (at your option) any later version.
+  Red Hat Messaging is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
 
-    This library is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-    Lesser General Public License for more details.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
 
-    You should have received a copy of the GNU Lesser General Public
-    License along with this library; if not, write to the Free Software
-    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
-    USA
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+  USA
 
-    The GNU Lesser General Public License is available in the file COPYING.
+  The GNU Lesser General Public License is available in the file COPYING.
 */
 
 #include "BdbMessageStore.h"
@@ -62,8 +62,8 @@
                                                         prepareXidDb(&env, 0),
                                                         numJrnlFiles(8),
                                                         jrnlFsizePgs(24),
-														isInit(false),
-														envPath(envpath)
+                                                        isInit(false),
+                                                        envPath(envpath)
 
 {
 
@@ -72,15 +72,15 @@
 
 bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs) 
 { 
-	if (isInit) return true;
+    if (isInit) return true;
 	
     numJrnlFiles = jfiles;
     jrnlFsizePgs = jfileSizePgs;
-	useAsync = async;
-	if (dir.size()>0) storeDir = dir;
+    useAsync = async;
+    if (dir.size()>0) storeDir = dir;
 
-	string bdbdir = storeDir + "/rhm/dat/";
-	journal::jdir::create_dir(bdbdir);
+    string bdbdir = storeDir + "/rhm/dat/";
+    journal::jdir::create_dir(bdbdir);
 
 
     bool ret = false;
@@ -111,11 +111,11 @@
         txn.abort();
         throw;
     }
-	ret = mode(useAsync, force);
-	if (!ret) return false;
+    ret = mode(useAsync, force);
+    if (!ret) return false;
 
-	isInit = true;
-	return true;
+    isInit = true;
+    return true;
 }
 
 bool BdbMessageStore::init(const qpid::Options* options) 
@@ -154,17 +154,17 @@
 bool BdbMessageStore::mode(const bool async, const bool force)
 {
 
-	u_int32_t id (1); // key one in config is mode
+    u_int32_t id (1); // key one in config is mode
     Dbt key(&id, sizeof(id));
     size_t preamble_length = sizeof(u_int32_t);
     BufferValue value(preamble_length, 0);
-	u_int32_t avalue = async ? 1 : 2;
-	value.buffer.putLong( avalue );
-	bool same = false;
-	bool hasMode = false;
+    u_int32_t avalue = async ? 1 : 2;
+    value.buffer.putLong( avalue );
+    bool same = false;
+    bool hasMode = false;
     
     {	
-	    Cursor config;
+        Cursor config;
         config.open(configDb, 0);
         IdDbt rkey;
         BufferValue rvalue(preamble_length, 0);
@@ -172,21 +172,21 @@
 
         while (config.next(rkey, rvalue)) {
             if (rkey.id == 1)
-		    {
-		        hasMode = true;
-			    u_int32_t valueL = rvalue.buffer.getLong();
- 			    if (avalue == valueL){
-				    same = true;
-		        }else {
-				    break;
-		        }
+            {
+                hasMode = true;
+                u_int32_t valueL = rvalue.buffer.getLong();
+                if (avalue == valueL){
+                    same = true;
+                }else {
+                    break;
+                }
             }
-       }
+        }
     }
     if (same) return true;
-	if (!same && !force && hasMode) return false; 
-	if (!same && force && hasMode) {
-		truncate();
+    if (!same && !force && hasMode) return false; 
+    if (!same && force && hasMode) {
+        truncate();
     }
 	
     int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
@@ -195,12 +195,12 @@
     } else {
         return true;
     }
-	return false;
+    return false;
 }
 
 void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
 {
-	if(dupKey) db.set_flags(DB_DUPSORT);
+    if(dupKey) db.set_flags(DB_DUPSORT);
     db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
     dbs.push_back(&db);
 }
@@ -230,43 +230,43 @@
     }
 
     txn->commit(0); 
-	try{    
+    try{    
         journal::jdir::delete_dir(getJrnlBaseDir(),true);
     }    
-	catch (const journal::jexception& e) {
+    catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
     } 
 }
 
 void BdbMessageStore::create(PersistableQueue& queue)
 {
-	checkInit();
+    checkInit();
     if (queue.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
     if (usingJrnl()) {
         JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
         queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-		try	{
-	     	// init will create the deque's for the init...
-	     	jQueue->initialize();
-		} catch (const journal::jexception& e) {
+        try	{
+            // init will create the deque's for the init...
+            jQueue->initialize();
+        } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
         }
     }
 
     try {
         if (!create(queueDb, queueIdSequence, queue)) {
-	     	THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+            THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
         }
     } catch (const DbException& e) {
-	 	THROW_STORE_EXCEPTION_2("Error creating queue named  " + queue.getName(), e);
+        THROW_STORE_EXCEPTION_2("Error creating queue named  " + queue.getName(), e);
     }
 }
 
 void BdbMessageStore::destroy(PersistableQueue& queue)
 {
-	checkInit();
+    checkInit();
     destroy(queueDb, queue);
     qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
     if (eqs)
@@ -280,7 +280,7 @@
 
 void BdbMessageStore::create(const PersistableExchange& exchange)
 {
-	checkInit();
+    checkInit();
     if (exchange.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
     }
@@ -296,7 +296,7 @@
 
 void BdbMessageStore::destroy(const PersistableExchange& exchange)
 {
-	checkInit();
+    checkInit();
     destroy(exchangeDb, exchange);
     //need to also delete bindings
     IdDbt key(exchange.getPersistenceId());
@@ -326,9 +326,9 @@
 
 
 void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q, 
-                              const std::string& k, const FieldTable& a)
+                           const std::string& k, const FieldTable& a)
 {
-	checkInit();
+    checkInit();
     IdDbt key(e.getPersistenceId());    
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
@@ -338,9 +338,9 @@
 }
 
 void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q, 
-                                const std::string& k, const FieldTable& a)
+                             const std::string& k, const FieldTable& a)
 {
-	checkInit();
+    checkInit();
     IdDbt key(e.getPersistenceId());    
     BindingDbt value(e, q, k, a);
 
@@ -357,7 +357,7 @@
 
 void BdbMessageStore::recover(RecoveryManager& registry)
 {
-	checkInit();
+    checkInit();
     txn_list prepared;
     recoverXids(prepared);
 
@@ -384,17 +384,17 @@
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {        
         
-		TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
-		RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+        TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+        RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
         if (i->enqueues.get()) {
             for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
-				tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                 dtx->enqueue(queues[j->first], messages[j->second]);
             }
         }
         if (i->dequeues.get()) {
             for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
-				tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                 dtx->dequeue(queues[j->first], messages[j->second]);
             }
         }
@@ -403,7 +403,7 @@
 }
 
 void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
-prepared, message_index& messages)
+                                    prepared, message_index& messages)
 {
     Cursor queues;
     queues.open(queueDb, txn.get());
@@ -423,22 +423,22 @@
 	
         if (usingJrnl())
         {
-	          const char* queueName = queue->getName().c_str();
-              JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
-              queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+            const char* queueName = queue->getName().c_str();
+            JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
+            queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 	
-	          try
-	          {
-                  u_int64_t thisHighestRid = 0;
-                  jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
-                  if (thisHighestRid > highestRid)
-                      highestRid = thisHighestRid;
-                  recoverMessages(txn, registry, queue, prepared, messages); 
-				  jQueue->recover_complete(); // start journal.
-	          } catch (const journal::jexception& e) {
-                 THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
-              }
-			  //read all messages: done on a per queue basis if using Journal
+            try
+            {
+                u_int64_t thisHighestRid = 0;
+                jQueue->recover(prepared, thisHighestRid, key.id); // start recovery
+                if (thisHighestRid > highestRid)
+                    highestRid = thisHighestRid;
+                recoverMessages(txn, registry, queue, prepared, messages); 
+                jQueue->recover_complete(); // start journal.
+            } catch (const journal::jexception& e) {
+                THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
+            }
+            //read all messages: done on a per queue basis if using Journal
         }
 
         queue_index[key.id] = queue;
@@ -447,7 +447,7 @@
     messageIdSequence.reset(highestRid + 1);
     queueIdSequence.reset(maxQueueId + 1);
 	
-	if (!usingJrnl()) //read all messages: 
+    if (!usingJrnl()) //read all messages: 
         recoverMessages(txn, registry, queue_index, prepared, messages);
 }
 
@@ -508,15 +508,15 @@
 
 // async IO version.
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
-	    			 qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+                                      qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
 {
 
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
  
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-	DataTokenImpl dtokp;
-	size_t readSize = 0;
-	unsigned msg_count=0;
+    DataTokenImpl dtokp;
+    size_t readSize = 0;
+    unsigned msg_count=0;
     bool read = true;
 
     void* dbuff = NULL; size_t dbuffSize = 0;
@@ -526,77 +526,77 @@
 
 
     dtokp.set_wstate(DataTokenImpl::ENQ);
-	// read the message from the Journal.
+    // read the message from the Journal.
     try {
         
-//std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
+        //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
 		
         unsigned aio_sleep_cnt = 0;
-		while (read) {
+        while (read) {
             rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
-    		readSize = dtokp.dsize();
+            readSize = dtokp.dsize();
 		
             switch (res)
             {
-                case rhm::journal::RHM_IORES_SUCCESS:{
-                    msg_count++;
-	                RecoverableMessage::shared_ptr msg;
-					char* data = (char*)dbuff;
+              case rhm::journal::RHM_IORES_SUCCESS:{
+                  msg_count++;
+                  RecoverableMessage::shared_ptr msg;
+                  char* data = (char*)dbuff;
 					
-					unsigned headerSize;
-					if (externalFlag){
-					    msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl 
-					} else {
-				        headerSize = Buffer(data, preambleLength).getLong();
-                        Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
-            		    msg = recovery.recoverMessage(headerBuff);
-                    }
-            		msg->setPersistenceId(dtokp.rid());
+                  unsigned headerSize;
+                  if (externalFlag){
+                      msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl 
+                  } else {
+                      headerSize = Buffer(data, preambleLength).getLong();
+                      Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+                      msg = recovery.recoverMessage(headerBuff);
+                  }
+                  msg->setPersistenceId(dtokp.rid());
 				 
-                    u_int32_t contentOffset = headerSize + preambleLength;
-                    u_int64_t contentSize = readSize - contentOffset;
-                    if (msg->loadContent(contentSize) && !externalFlag) {
-                         //now read the content
-		    			 Buffer contentBuff(data + contentOffset, contentSize);
-                         msg->decodeContent(contentBuff);
-                    }
+                  u_int32_t contentOffset = headerSize + preambleLength;
+                  u_int64_t contentSize = readSize - contentOffset;
+                  if (msg->loadContent(contentSize) && !externalFlag) {
+                      //now read the content
+                      Buffer contentBuff(data + contentOffset, contentSize);
+                      msg->decodeContent(contentBuff);
+                  }
 
-                 	if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
-                        prepared[dtokp.rid()] = msg;
-                    } else {
-                         queue->recover(msg);
-                    }
+                  if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
+                      prepared[dtokp.rid()] = msg;
+                  } else {
+                      queue->recover(msg);
+                  }
     
-    				dtokp.reset();
-	    			dtokp.set_wstate(DataTokenImpl::ENQ);
+                  dtokp.reset();
+                  dtokp.set_wstate(DataTokenImpl::ENQ);
 					
-    				if (xidbuff)
-						::free(xidbuff);
-					else if (dbuff)
-						::free(dbuff);
-                    aio_sleep_cnt = 0;
-                    break;
-		        }
-                case rhm::journal::RHM_IORES_AIO_WAIT:
-                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                        THROW_STORE_EXCEPTION("Timeout waiting for AIO");
-                    ::usleep(AIO_SLEEP_TIME);
-                    break;
-                case rhm::journal::RHM_IORES_EMPTY:
-                    read = false;
-				    break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
-                default:
-				    assert( "Store Error: Unexpected msg state");
+                  if (xidbuff)
+                      ::free(xidbuff);
+                  else if (dbuff)
+                      ::free(dbuff);
+                  aio_sleep_cnt = 0;
+                  break;
+              }
+              case rhm::journal::RHM_IORES_AIO_WAIT:
+                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                    THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                ::usleep(AIO_SLEEP_TIME);
+                break;
+              case rhm::journal::RHM_IORES_EMPTY:
+                read = false;
+                break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
+              default:
+                assert( "Store Error: Unexpected msg state");
             } // switch
         } // while
     } catch (const journal::jexception& e) {
-		THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
-                ": recoverMessages() failed: " + e.what());
-	}
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
+                              ": recoverMessages() failed: " + e.what());
+    }
 }
 
 RecoverableMessage::shared_ptr  BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery, 
-     uint64_t messageId, unsigned& headerSize)
+                                                                  uint64_t messageId, unsigned& headerSize)
 {
     Dbt key (&messageId, sizeof(messageId));
     size_t preamble_length = sizeof(u_int32_t)/*header size*/;
@@ -604,17 +604,17 @@
     BufferValue value(preamble_length, 0);
     value.buffer.record();
     if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
-                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
     }
-	//read header only to begin with
+    //read header only to begin with
     headerSize = value.buffer.getLong();
 
     BufferValue header(headerSize, preamble_length);
     if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
-                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
     }
 
-     return recovery.recoverMessage(header.buffer);
+    return recovery.recoverMessage(header.buffer);
 } 
 
 
@@ -695,34 +695,34 @@
     std::set<string> prepared;
     collectPreparedXids(prepared);
 
-	//when using the async journal, it will abort unprepaired xids and populate the locked maps
-	if (!usingJrnl()){
-	    txn_lock_map enqueues;
- 	    txn_lock_map dequeues;
+    //when using the async journal, it will abort unprepaired xids and populate the locked maps
+    if (!usingJrnl()){
+        txn_lock_map enqueues;
+        txn_lock_map dequeues;
         std::set<string> known;
     	readXids(enqueueXidDb, known);
     	readXids(dequeueXidDb, known);
 
     	//abort all known but unprepared xids:
     	for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
-	        if (prepared.find(*i) == prepared.end()) {
+            if (prepared.find(*i) == prepared.end()) {
             	TPCTxnCtxt txn(*i, NULL);
-        	    completed(txn, dequeueXidDb, enqueueXidDb, false);
-        	}
+                completed(txn, dequeueXidDb, enqueueXidDb, false);
+            }
     	}
-	    readLockedMappings(enqueueXidDb, enqueues);
- 	    readLockedMappings(dequeueXidDb, dequeues);
-  	    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
-  	        txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
-  	    }
-	} else {
-  	    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+        readLockedMappings(enqueueXidDb, enqueues);
+        readLockedMappings(dequeueXidDb, dequeues);
+        for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+            txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+        }
+    } else {
+        for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
             LockedMappings::shared_ptr enq_ptr;
             enq_ptr.reset(new LockedMappings);
             LockedMappings::shared_ptr deq_ptr;
             deq_ptr.reset(new LockedMappings);
-  	        txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
-  	    }
+            txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+        }
         
     }
 }
@@ -751,12 +751,12 @@
         std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
         LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
         /*
-        txn_lock_map::iterator i = mappings.find(xid);
-        if (i == mappings.end()) {
-            LockedMappings::shared_ptr ptr(new LockedMappings());
-            i = mappings.insert(std::make_pair(xid, ptr)).first;
-        }
-        i->second->add(value.queueId(), value.messageId());
+          txn_lock_map::iterator i = mappings.find(xid);
+          if (i == mappings.end()) {
+          LockedMappings::shared_ptr ptr(new LockedMappings());
+          i = mappings.insert(std::make_pair(xid, ptr)).first;
+          }
+          i->second->add(value.queueId(), value.messageId());
         */
     }
 }
@@ -768,7 +768,7 @@
 
 void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
 {
-	checkInit();
+    checkInit();
     TxnCtxt txn;
     txn.begin(env, true);
 
@@ -779,7 +779,7 @@
             messageId = messageIdSequence.next();
             store(NULL, &txn, key, msg, true);
             msg->setPersistenceId(messageId);
-			txn.commit();
+            txn.commit();
         } catch (const std::exception& e) {
             txn.abort();
             throw;
@@ -788,7 +788,7 @@
 }
 void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
 {
-	checkInit();
+    checkInit();
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId) {
         Dbt key (&messageId, sizeof(messageId));
@@ -822,7 +822,7 @@
 
 void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
 {
-	checkInit();
+    checkInit();
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId != 0) {
         try {
@@ -850,9 +850,9 @@
 }
 
 void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
-             intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+                                  intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
 {
-	checkInit();
+    checkInit();
     u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
     u_int64_t messageId (msg->getPersistenceId());
 	
@@ -860,10 +860,10 @@
         try {
             JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
             if (jc && jc->is_enqueued(messageId) ){
-			    if (jc->loadMsgContent(messageId, data, realOffset, length)){
-		             return;
-			    }
-			}
+                if (jc->loadMsgContent(messageId, data, realOffset, length)){
+                    return;
+                }
+            }
             Dbt key (&messageId, sizeof(messageId));
             char *buffer = new char[length];
             Dbt value(buffer, length);
@@ -873,17 +873,17 @@
             value.set_dlen(length);
             int status = messageDb.get(0, &key, &value, 0); 
             if (status == DB_NOTFOUND) {
-		        delete [] buffer;
+                delete [] buffer;
                 THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
             } else {
                 data.assign(buffer, value.get_size());
-		        delete [] buffer;
+                delete [] buffer;
             }
         } catch (const DbException& e) {
             THROW_STORE_EXCEPTION_2("Error loading content", e);
         } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
-                    ": loadContent() failed: " + e.what());
+                                  ": loadContent() failed: " + e.what());
         }		
     } else {
         THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -893,21 +893,21 @@
 void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
 {
     if (!usingJrnl()) return;
-	checkInit();
+    checkInit();
     try {
-		JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-		if (jc){
-		    jc->flush();
-		}
+        JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+        if (jc){
+            jc->flush();
+        }
     }catch (const journal::jexception& e) {
-       THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
-	}
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
+    }
 }
 
 void BdbMessageStore::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
-        const PersistableQueue& queue)
+                              const PersistableQueue& queue)
 {
-	checkInit();
+    checkInit();
     u_int64_t queueId (queue.getPersistenceId());
     u_int64_t messageId (msg->getPersistenceId());
     if (queueId == 0) {
@@ -931,24 +931,24 @@
         if (messageId == 0) {
             messageId = messageIdSequence.next();
             msg->setPersistenceId(messageId);
-	        newId = true;
-	    }
+            newId = true;
+        }
         store(&queue, txn, key, msg, newId);
 
-	    if (usingJrnl()){
-			// add queue* to the txn map..
-			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-			if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
-		}else{
+        if (usingJrnl()){
+            // add queue* to the txn map..
+            if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+            if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
+        }else{
        	    msg->enqueueComplete();  // set enqueued for ack
             put(mappingDb, txn->get(), key, value);
         
-			// cct if using Journal do we need to wait for IO to complete before calling thus???
-			// set enqueue comple on callback msg.enqueueComplete();
-			if (txn->isTPC()) {
-		        record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
-		    }
-		}
+            // cct if using Journal do we need to wait for IO to complete before calling thus???
+            // set enqueue comple on callback msg.enqueueComplete();
+            if (txn->isTPC()) {
+                record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
+            }
+        }
         
         if (!ctxt) txn->commit();
     } catch (const std::exception& e) {
@@ -958,27 +958,27 @@
 }
 
 void BdbMessageStore::store(const PersistableQueue* queue, 
-			TxnCtxt* txn, Dbt& messageId, 
-			intrusive_ptr<PersistableMessage>& message, 
-			bool newId)
+                            TxnCtxt* txn, Dbt& messageId, 
+                            intrusive_ptr<PersistableMessage>& message, 
+                            bool newId)
 {
     u_int32_t headerSize = message->encodedHeaderSize();
     u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
     char* buff= 0;
-	if (!message->isContentReleased() )
-	{
-	    buff = static_cast<char*>(::alloca(size)); // long + headers + content
+    if (!message->isContentReleased() )
+    {
+        buff = static_cast<char*>(::alloca(size)); // long + headers + content
         Buffer buffer(buff,size);
         buffer.putLong(headerSize);
         message->encode(buffer);
-	}
+    }
 
     try {
 
-     if ( queue && usingJrnl()){
-//std::cout << "E" << std::flush;
-        boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
-		dtokp->ref();
+        if ( queue && usingJrnl()){
+            //std::cout << "E" << std::flush;
+            boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+            dtokp->addRef();
 	    dtokp->setSourceMessage(message);
 	    dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
 
@@ -987,71 +987,71 @@
             unsigned busy_sleep_cnt = 0;
             while (!written)
             {
-	            JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-				rhm::journal::iores eres;
-				if (txn->getXid().empty()){
-					if (message->isContentReleased()){
-						eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
-					}else {
-						eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
-					}
-				}else {
-					if (message->isContentReleased()){
-					   eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
-					} else {
-					    eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
-					}
-				}
+                JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+                rhm::journal::iores eres;
+                if (txn->getXid().empty()){
+                    if (message->isContentReleased()){
+                        eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+                    }else {
+                        eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+                    }
+                }else {
+                    if (message->isContentReleased()){
+                        eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+                    } else {
+                        eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+                    }
+                }
                 switch (eres)
                 {
-                    case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
-                        if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
-                            written = true;
-                        aio_sleep_cnt = 0;
-                        busy_sleep_cnt = 0;
-                        break;
-                    case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
-                        if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                            THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-                        usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
-                        jc->get_wr_events();
-                        break;
-                   case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
-                        if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-                            THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-                        usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-                        break;
-                    case rhm::journal::RHM_IORES_FULL:
-                        std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-        		        THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
-                        break;
-                    default:
-                        assert( "Store Error: Unexpected msg state");
+                  case rhm::journal::RHM_IORES_SUCCESS:
+                    //std::cout << "." << std::flush;
+                    if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+                        written = true;
+                    aio_sleep_cnt = 0;
+                    busy_sleep_cnt = 0;
+                    break;
+                  case rhm::journal::RHM_IORES_AIO_WAIT:
+                    //std::cout << "w" << std::flush;
+                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                        THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+                    usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+                    jc->get_wr_events();
+                    break;
+                  case rhm::journal::RHM_IORES_BUSY:
+                    //std::cout << "b" << std::flush;
+                    if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+                        THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+                    usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+                    break;
+                  case rhm::journal::RHM_IORES_FULL:
+                    std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+                    THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
+                    break;
+                  default:
+                    assert( "Store Error: Unexpected msg state");
                 }
             }
 	 
-      } else {
-    	   /// cct message db
-           if (newId){  // only store in Bd if first time message is stored
-               Dbt data(buff,size);
-	           messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
-	      }
-      }
+        } else {
+            /// cct message db
+            if (newId){  // only store in Bd if first time message is stored
+                Dbt data(buff,size);
+                messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
+            }
+        }
     } catch (const journal::jexception& e) {
-       THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
-               e.what());
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
+                              e.what());
     } catch (const DbException& e) {
         THROW_STORE_EXCEPTION_2("Error storing message", e);
     }
 }
 
 void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
-        const PersistableQueue& queue)
+                              const PersistableQueue& queue)
 {    
-	checkInit();
+    checkInit();
     u_int64_t queueId (queue.getPersistenceId());
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId == 0) {
@@ -1072,23 +1072,23 @@
     
     try {
         
-		if (usingJrnl()){
-			// add queue* to the txn map..
-			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-			async_dequeue(ctxt, msg, queue); 
+        if (usingJrnl()){
+            // add queue* to the txn map..
+            if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+            async_dequeue(ctxt, msg, queue); 
      	    // added here as we are not doing it async on call back
-			if (msg->isContentReleased())  // TODO remove this code once jrnl is used for transient policy see **
-			{
-			    Dbt key (&messageId, sizeof(messageId));
+            if (msg->isContentReleased())  // TODO remove this code once jrnl is used for transient policy see **
+            {
+                Dbt key (&messageId, sizeof(messageId));
                 Dbt value (&queueId, sizeof(queueId));
                 dequeue(txn->get(), key, value);
-			}
+            }
 			
-			msg->dequeueComplete();
-// 		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
-// 		         msg->setPersistenceId(0);
+            msg->dequeueComplete();
+            // 		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
+            // 		         msg->setPersistenceId(0);
 			
-		} else if (txn->isTPC()) {
+        } else if (txn->isTPC()) {
             //if this is part of a 2pc transaction, then only record the dequeue now,
             //it will be applied on commit
             record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId, queueId);
@@ -1098,7 +1098,7 @@
             if (dequeue(txn->get(), key, value)) {
                 msg->setPersistenceId(0);//clear id as we have now removed the message from the store
                 msg->dequeueComplete(); // set dequeued for ack
-	        }
+            }
         }
         if (!ctxt) txn->commit();
         
@@ -1111,62 +1111,64 @@
     }   
 }
 
-void BdbMessageStore::async_dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
-        const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(
+    TransactionContext* ctxt,
+    intrusive_ptr<PersistableMessage>& msg,
+    const PersistableQueue& queue)
 {
-//std::cout << "D" << std::flush;
+    //std::cout << "D" << std::flush;
     bool written = false;
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
-	ddtokp->ref();
- 	ddtokp->setSourceMessage(msg);
-	ddtokp->set_rid(messageIdSequence.next()); 
-	ddtokp->set_dequeue_rid(msg->getPersistenceId());
-	ddtokp->set_wstate(DataTokenImpl::ENQ);
-	JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-	string tid;
+    ddtokp->addRef();
+    ddtokp->setSourceMessage(msg);
+    ddtokp->set_rid(messageIdSequence.next()); 
+    ddtokp->set_dequeue_rid(msg->getPersistenceId());
+    ddtokp->set_wstate(DataTokenImpl::ENQ);
+    JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+    string tid;
     if (ctxt){
-		TxnCtxt* txn = check(ctxt);
-		tid = txn->getXid();
-	}
+        TxnCtxt* txn = check(ctxt);
+        tid = txn->getXid();
+    }
 
     unsigned aio_sleep_cnt = 0;
     unsigned busy_sleep_cnt = 0;
     while (!written)
     {
-         rhm::journal::iores dres;
-         try {
-		      if (tid.empty()){
-			  	  dres = jc->dequeue_data_record(ddtokp.get());
-			  } else {
-			  	  dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
-			  }
-         } catch (const journal::jexception& e) { 
-			  THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
-	     }
-         switch (dres)
-         {
-             case rhm::journal::RHM_IORES_SUCCESS:
-//std::cout << "." << std::flush;
-                 aio_sleep_cnt = 0;
-                 busy_sleep_cnt = 0;
-                 written = true;
-                 break;
-             case rhm::journal::RHM_IORES_AIO_WAIT:
-//std::cout << "w" << std::flush;
-                 if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                     THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-                 usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-                 jc->get_wr_events();
-                 break;
-             case rhm::journal::RHM_IORES_BUSY:
-//std::cout << "b" << std::flush;
-                 if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-                     THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-                 usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-                 break;
-             default:
-                 assert( "Store Error: Unexpected msg state");
-         }
+        rhm::journal::iores dres;
+        try {
+            if (tid.empty()){
+                dres = jc->dequeue_data_record(ddtokp.get());
+            } else {
+                dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+            }
+        } catch (const journal::jexception& e) { 
+            THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+        }
+        switch (dres)
+        {
+          case rhm::journal::RHM_IORES_SUCCESS:
+            //std::cout << "." << std::flush;
+            aio_sleep_cnt = 0;
+            busy_sleep_cnt = 0;
+            written = true;
+            break;
+          case rhm::journal::RHM_IORES_AIO_WAIT:
+            //std::cout << "w" << std::flush;
+            if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+            usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+            jc->get_wr_events();
+            break;
+          case rhm::journal::RHM_IORES_BUSY:
+            //std::cout << "b" << std::flush;
+            if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+                THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+            usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+            break;
+          default:
+            assert( "Store Error: Unexpected msg state");
+        }
     }
 }
 
@@ -1215,7 +1217,7 @@
 
 u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
 {
-	checkInit();
+    checkInit();
     return 0;
 }
 
@@ -1285,8 +1287,8 @@
 
 auto_ptr<TransactionContext> BdbMessageStore::begin() 
 {
-	checkInit();
-	// pass sequence number for c/a when using jrnl
+    checkInit();
+    // pass sequence number for c/a when using jrnl
     TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
     txn->begin(env, !usingJrnl());
     return auto_ptr<TransactionContext>(txn);
@@ -1294,11 +1296,11 @@
 
 std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
 {
-	checkInit();
-	IdSequence* jtx = NULL;
-	if (usingJrnl()) jtx = &messageIdSequence;
+    checkInit();
+    IdSequence* jtx = NULL;
+    if (usingJrnl()) jtx = &messageIdSequence;
 
-	// pass sequence number for c/a when using jrnl
+    // pass sequence number for c/a when using jrnl
     TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
     txn->begin(env, !usingJrnl());
     return auto_ptr<TPCTransactionContext>(txn);
@@ -1306,7 +1308,7 @@
 
 void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
 {
-	checkInit();
+    checkInit();
     TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
     if(!txn) throw InvalidTransactionContextException();
     
@@ -1316,8 +1318,8 @@
         Dbt key ((void*) xid.data(), xid.length());
         Dbt value(&dummy, sizeof(dummy));
 
-		// make sure all the data is written to disk before returning
- 		txn->sync();
+        // make sure all the data is written to disk before returning
+        txn->sync();
         prepareXidDb.put(txn->get(), &key, &value, 0);
 
         txn->commit();
@@ -1329,8 +1331,8 @@
 
 void BdbMessageStore::commit(TransactionContext& ctxt) 
 {
-	checkInit();
-     TxnCtxt* txn(check(&ctxt));
+    checkInit();
+    TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);        
     } else {
@@ -1340,7 +1342,7 @@
 
 void BdbMessageStore::abort(TransactionContext& ctxt) 
 {
-	checkInit();
+    checkInit();
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-06 18:24:12 UTC (rev 1437)
@@ -29,10 +29,13 @@
 using namespace rhm::bdbstore;
 using namespace rhm::journal;
 
-qpid::broker::TimerA JournalImpl::journalTimer;
+qpid::broker::Timer JournalImpl::journalTimer;
 
 void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() { if (parent) parent->getEventsFire(); unref(); }
+void GetEventsFireEvent::fire() {
+    if (parent) parent->getEventsFire();
+    release();
+}
 
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
@@ -244,7 +247,7 @@
 {
     jcntl::flush();
     if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
-        intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+        getEventsFireEventsPtr->addRef();
         journalTimer.add(getEventsFireEventsPtr);
         getEventsTimerSetFlag = true;
     }
@@ -265,7 +268,7 @@
     }
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) {
-        intrusive_ptr_add_ref(getEventsFireEventsPtr.get());
+        getEventsFireEventsPtr->addRef();
         journalTimer.add(getEventsFireEventsPtr);
         getEventsTimerSetFlag = true;
     }

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-12-06 18:24:12 UTC (rev 1437)
@@ -38,25 +38,25 @@
 
         class JournalImpl;
 
-        class InactivityFireEvent : public virtual qpid::broker::TimerTaskA
+        class InactivityFireEvent : public virtual qpid::broker::TimerTask
         {
             JournalImpl*    parent;
 
         public:
 	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::broker::TimerTaskA(timeout), parent(p) {}
+                qpid::broker::TimerTask(timeout), parent(p) {}
             virtual ~InactivityFireEvent() {}
             void fire();
 		    inline void cancel() { parent=0; }
         };
 
-        class GetEventsFireEvent : public virtual qpid::broker::TimerTaskA
+        class GetEventsFireEvent : public virtual qpid::broker::TimerTask
         {
             JournalImpl*    parent;
 
         public:
 	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::broker::TimerTaskA(timeout), parent(p) {}
+                qpid::broker::TimerTask(timeout), parent(p) {}
             virtual ~GetEventsFireEvent() {}
             void fire();
 		    inline void cancel() { parent=0; }
@@ -65,14 +65,14 @@
         class JournalImpl : public journal::jcntl 
         {            
         private:
-            static qpid::broker::TimerA journalTimer;
+            static qpid::broker::Timer journalTimer;
 
             bool getEventsTimerSetFlag;
-            qpid::broker::TimerTaskA::intrusive_ptr getEventsFireEventsPtr;
+            qpid::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
 
             bool writeActivityFlag;
             bool flushTriggeredFlag;
-            qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+            qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
             
             // temp local vars for loadMsgContent below
             void* _xidp;

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-12-06 18:24:12 UTC (rev 1437)
@@ -72,7 +72,7 @@
             JournalImpl* jc = static_cast<JournalImpl*>(*i);
             if (jc && loggedtx) { /* if using journal */
                 boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
-                dtokp->ref();
+                dtokp->addRef();
                 dtokp->set_rid(loggedtx->next());
                 try{
                     if (commit)

Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-06 18:24:12 UTC (rev 1437)
@@ -41,25 +41,11 @@
 namespace journal
 {
 
-void intrusive_ptr_add_ref(data_tok* tok)
-{
-    tok->ref();
-}
-
-void intrusive_ptr_release(data_tok* tok)
-{
-    tok->unref();
-    if (tok->refcnt() == 0)
-        delete tok;
-}
-
-
 // Static members
 
 u_int64_t data_tok::_cnt = 0;
 
 data_tok::data_tok():
-	_ref_cnt(0),
     _wstate(NONE),
     _rstate(UNREAD),
     _dsize(0),

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-06 18:24:12 UTC (rev 1437)
@@ -33,6 +33,8 @@
 #ifndef rhm_journal_data_tok_hpp
 #define rhm_journal_data_tok_hpp
 
+#include <qpid/RefCounted.h>
+
 namespace rhm
 {
 namespace journal
@@ -66,7 +68,7 @@
     * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
     *     I/O process
     */
-    class data_tok
+class data_tok : public qpid::RefCounted
     {
     public:
         // TODO: Fix this, separate write state from operation
@@ -102,7 +104,6 @@
         };
 
     private:
-		size_t      _ref_cnt;       ///< Ref count for auto cleanup
         pthread_mutex_t _mutex;
         static u_int64_t _cnt;
         u_int64_t   _icnt;
@@ -121,9 +122,6 @@
         data_tok();
         virtual ~data_tok();
 
-        inline size_t refcnt(void) { return _ref_cnt;}
-	    inline void ref(void) { _ref_cnt++; }
-        inline void unref(void) { _ref_cnt--; }
         inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
                 { return _sourceMsg; }
         inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
@@ -171,10 +169,6 @@
 
         void reset();
     };
-	
-	void intrusive_ptr_add_ref(data_tok* r);
-    void intrusive_ptr_release(data_tok* r);
-	
 
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-06 17:59:43 UTC (rev 1436)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-06 18:24:12 UTC (rev 1437)
@@ -1,34 +1,34 @@
 /**
-* \file jcntl.cpp
-*
-* Red Hat Messaging - Message Journal
-*
-* Messaging journal top-level control and interface class
-* rhm::journal::jcntl.  See comments in file jcntl.hpp for details.
-*
-* \author Kim van der Riet
-*
-* Copyright 2007 Red Hat, Inc.
-*
-* This file is part of Red Hat Messaging.
-*
-* Red Hat Messaging is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License as published by the Free Software Foundation; either
-* version 2.1 of the License, or (at your option) any later version.
-*
-* This library is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this library; if not, write to the Free Software
-* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
-* USA
-*
-* The GNU Lesser General Public License is available in the file COPYING.
-*/
+ * \file jcntl.cpp
+ *
+ * Red Hat Messaging - Message Journal
+ *
+ * Messaging journal top-level control and interface class
+ * rhm::journal::jcntl.  See comments in file jcntl.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright 2007 Red Hat, Inc.
+ *
+ * This file is part of Red Hat Messaging.
+ *
+ * Red Hat Messaging is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
 
 
 #include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
 // Functions
 
 jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
-        const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+             const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
     _jid(jid),
     _jdir(jdir, base_filename),
     _base_filename(base_filename),
@@ -90,7 +90,7 @@
 
 void
 jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
-        std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+                  std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
 {
     // Prepare journal dir, journal files and file handles
     _jdir.clear_dir();
@@ -132,8 +132,8 @@
 
 void
 jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
-        const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
-        throw (jexception)
+               const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+    throw (jexception)
 {
     // Verify journal dir and journal files
     _jdir.verify_dir();
@@ -145,8 +145,8 @@
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
 
-// Debug info, but may be useful to print with a flag
-//_rcvdat.print(_jid);
+    // Debug info, but may be useful to print with a flag
+    //_rcvdat.print(_jid);
 
     if (_datafh)
     {
@@ -190,7 +190,7 @@
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.recover_complete(_rcvdat._fro);
     _readonly_flag = false;
-//std::cout << "Journal revovery complete." << std::endl;
+    //std::cout << "Journal revovery complete." << std::endl;
 }
 
 void 
@@ -203,8 +203,8 @@
 
 const iores
 jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-        const size_t this_data_len, data_tok* dtokp, const bool transient)
-        throw (jexception)
+                           const size_t this_data_len, data_tok* dtokp, const bool transient)
+    throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_data_record");
@@ -212,7 +212,7 @@
     try
     {
         res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
-                false);
+                            false);
     }
     catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
     pthread_mutex_unlock(&_mutex);
@@ -221,7 +221,7 @@
 
 const iores
 jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
-        throw (jexception)
+    throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_extern_data_record");
@@ -237,8 +237,8 @@
 
 const iores
 jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-        const size_t this_data_len, data_tok* dtokp, const std::string& xid,
-        const bool transient) throw (jexception)
+                               const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+                               const bool transient) throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_tx_data_record");
@@ -246,7 +246,7 @@
     try
     {
         res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
-                transient, false);
+                            transient, false);
     }
     catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
     pthread_mutex_unlock(&_mutex);
@@ -255,7 +255,7 @@
 
 const iores
 jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
-        const std::string& xid, const bool transient) throw (jexception)
+                                      const std::string& xid, const bool transient) throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +271,7 @@
 
 const iores
 jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
-        const void** const data, bool auto_discard) throw (jexception)
+                       const void** const data, bool auto_discard) throw (jexception)
 {
     check_rstatus("get_data_record");
     return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +286,7 @@
 
 const iores
 jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
-        bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+                        bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
 {
     check_rstatus("read_data");
     return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +466,14 @@
         _num_jfiles = ji.num_jfiles();
         _rcvdat._enq_cnt_list.resize(_num_jfiles);
         std::cout << "WARNING: Recovery found " << _num_jfiles <<
-                " files (different from --num-jfiles parameter value)." << std::endl;
+            " files (different from --num-jfiles parameter value)." << std::endl;
     }
     if (_jfsize_sblks != ji.jfsize_sblks())
     {
         _jfsize_sblks = ji.jfsize_sblks();
         std::cout << "WARNING: Recovery found file size = " <<
-                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
-                " (different from --jfile-size-pgs parameter value)." << std::endl;
+            (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+            " (different from --jfile-size-pgs parameter value)." << std::endl;
     }
 
     try
@@ -499,16 +499,16 @@
         if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
             rd._full = true;
         
-		std::vector<std::string> xid_list;
-		_tmap.xid_list(xid_list);
-		for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
-                itr++)
-		{
-		    std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
-                    prep_txn_list.end(), *itr);
-			if (pitr == prep_txn_list.end())
-				_tmap.get_remove_tdata_list(*itr);
-		} 
+        std::vector<std::string> xid_list;
+        _tmap.xid_list(xid_list);
+        for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+             itr++)
+        {
+            std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+                                                                      prep_txn_list.end(), *itr);
+            if (pitr == prep_txn_list.end())
+                _tmap.get_remove_tdata_list(*itr);
+        } 
     }
 }
 
@@ -525,157 +525,157 @@
     ifsp->read((char*)&h, sizeof(hdr));
     switch(h._magic)
     {
-        case RHM_JDAT_ENQ_MAGIC:
+      case RHM_JDAT_ENQ_MAGIC:
+        {
+            if (!check_owi(fid, h, rd, read_pos))
+                return false;
+            enq_rec er;
+            while (!done)
             {
-                if (!check_owi(fid, h, rd, read_pos))
+                done = er.rcv_decode(h, ifsp, cum_size_read);
+                if (!jfile_cycle(fid, ifsp, rd, true))
                     return false;
-                enq_rec er;
-                while (!done)
-                {
-                    done = er.rcv_decode(h, ifsp, cum_size_read);
-                    if (!jfile_cycle(fid, ifsp, rd, true))
-                        return false;
-                }
-                if (!er.is_transient()) // Ignore transient msgs
-                {
-                    rd._enq_cnt_list[fid]++;
-                    if (er.xid_size())
-                    {
-                        er.get_xid(&xidp);
-                        assert(xidp != NULL);
-                        std::string xid((char*)xidp, er.xid_size());
-                        _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
-                        ::free(xidp);
-                    }
-                    else
-                        _emap.insert_fid(h._rid, fid);
-                }
             }
-            break;
-        case RHM_JDAT_DEQ_MAGIC:
+            if (!er.is_transient()) // Ignore transient msgs
             {
-                if (!check_owi(fid, h, rd, read_pos))
-                    return false;
-                deq_rec dr;
-                while (!done)
+                rd._enq_cnt_list[fid]++;
+                if (er.xid_size())
                 {
-                    done = dr.rcv_decode(h, ifsp, cum_size_read);
-                    if (!jfile_cycle(fid, ifsp, rd, true))
-                        return false;
-                }
-                if (dr.xid_size())
-                {
-                    // If the enqueue is part of a pending txn, it will not yet be in emap
-                    try { _emap.lock(dr.deq_rid()); }
-                    catch(const jexception& e)
-                    {
-                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-                    }
-                    dr.get_xid(&xidp);
+                    er.get_xid(&xidp);
                     assert(xidp != NULL);
-                    std::string xid((char*)xidp, dr.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+                    std::string xid((char*)xidp, er.xid_size());
+                    _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
                     ::free(xidp);
                 }
                 else
-                {
-                    try
-                    {
-                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
-                        rd._enq_cnt_list[enq_fid]--;
-                    }
-                    catch(const jexception& e)
-                    {
-                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-                    }
-                }
+                    _emap.insert_fid(h._rid, fid);
             }
-            break;
-        case RHM_JDAT_TXA_MAGIC:
+        }
+        break;
+      case RHM_JDAT_DEQ_MAGIC:
+        {
+            if (!check_owi(fid, h, rd, read_pos))
+                return false;
+            deq_rec dr;
+            while (!done)
             {
-                if (!check_owi(fid, h, rd, read_pos))
+                done = dr.rcv_decode(h, ifsp, cum_size_read);
+                if (!jfile_cycle(fid, ifsp, rd, true))
                     return false;
-                txn_rec ar;
-                while (!done)
+            }
+            if (dr.xid_size())
+            {
+                // If the enqueue is part of a pending txn, it will not yet be in emap
+                try { _emap.lock(dr.deq_rid()); }
+                catch(const jexception& e)
                 {
-                   done = ar.rcv_decode(h, ifsp, cum_size_read);
-                    if (!jfile_cycle(fid, ifsp, rd, true))
-                        return false;
+                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
                 }
-                // Delete this txn from tmap, unlock any locked records in emap
-                ar.get_xid(&xidp);
+                dr.get_xid(&xidp);
                 assert(xidp != NULL);
-                std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                std::string xid((char*)xidp, dr.xid_size());
+                _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+                ::free(xidp);
+            }
+            else
+            {
+                try
                 {
-                    try 
-					{ 
-						if (!itr->_enq_flag)
-							_emap.unlock(itr->_drid);
-					}
-                    catch(const jexception& e)
-                    {
-                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-                    }
-                    if (itr->_enq_flag)
-                        rd._enq_cnt_list[itr->_fid]--;
+                    u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+                    rd._enq_cnt_list[enq_fid]--;
                 }
-                ::free(xidp);
+                catch(const jexception& e)
+                {
+                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                }
             }
-            break;
-        case RHM_JDAT_TXC_MAGIC:
+        }
+        break;
+      case RHM_JDAT_TXA_MAGIC:
+        {
+            if (!check_owi(fid, h, rd, read_pos))
+                return false;
+            txn_rec ar;
+            while (!done)
             {
-                if (!check_owi(fid, h, rd, read_pos))
+                done = ar.rcv_decode(h, ifsp, cum_size_read);
+                if (!jfile_cycle(fid, ifsp, rd, true))
                     return false;
-                txn_rec cr;
-                while (!done)
-                {
-                    done = cr.rcv_decode(h, ifsp, cum_size_read);
-                    if (!jfile_cycle(fid, ifsp, rd, true))
-                        return false;
+            }
+            // Delete this txn from tmap, unlock any locked records in emap
+            ar.get_xid(&xidp);
+            assert(xidp != NULL);
+            std::string xid((char*)xidp, ar.xid_size());
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            {
+                try 
+                { 
+                    if (!itr->_enq_flag)
+                        _emap.unlock(itr->_drid);
                 }
-                // Delete this txn from tmap, process records into emap
-                cr.get_xid(&xidp);
-                assert(xidp != NULL);
-                std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                catch(const jexception& e)
                 {
-                    if (itr->_enq_flag) // txn enqueue
-                        _emap.insert_fid(itr->_rid, itr->_fid);
-                    else // txn dequeue
-                    {
-                        u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
-                        rd._enq_cnt_list[fid]--;
-                    }
+                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
                 }
-                ::free(xidp);
+                if (itr->_enq_flag)
+                    rd._enq_cnt_list[itr->_fid]--;
             }
-            break;
-        case RHM_JDAT_EMPTY_MAGIC:
+            ::free(xidp);
+        }
+        break;
+      case RHM_JDAT_TXC_MAGIC:
+        {
+            if (!check_owi(fid, h, rd, read_pos))
+                return false;
+            txn_rec cr;
+            while (!done)
             {
-                u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
-                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+                done = cr.rcv_decode(h, ifsp, cum_size_read);
+                if (!jfile_cycle(fid, ifsp, rd, true))
+                    return false;
             }
+            // Delete this txn from tmap, process records into emap
+            cr.get_xid(&xidp);
+            assert(xidp != NULL);
+            std::string xid((char*)xidp, cr.xid_size());
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            {
+                if (itr->_enq_flag) // txn enqueue
+                    _emap.insert_fid(itr->_rid, itr->_fid);
+                else // txn dequeue
+                {
+                    u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+                    rd._enq_cnt_list[fid]--;
+                }
+            }
+            ::free(xidp);
+        }
         break;
-        case 0:
+      case RHM_JDAT_EMPTY_MAGIC:
+        {
+            u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+            ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+        }
+        break;
+      case 0:
+        rd._lfid = fid;
+        rd._eo = ifsp->tellg();
+        return false;
+      default:
+        // Is this the last file, if so, stop as this is the overwrite boundary.
+        if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+        {
             rd._lfid = fid;
-            rd._eo = ifsp->tellg();
+            rd._eo = read_pos;
             return false;
-        default:
-            // Is this the last file, if so, stop as this is the overwrite boundary.
-            if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
-            {
-                rd._lfid = fid;
-                rd._eo = read_pos;
-                return false;
-            }
-            std::stringstream ss;
-            ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-            ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
-            throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
-                    "rcvr_get_next_record");
+        }
+        std::stringstream ss;
+        ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+        ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+        throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+                         "rcvr_get_next_record");
     }
 
     return true;
@@ -747,7 +747,7 @@
         ss << " foffs=0x" << std::setw(8) << read_pos;
         ss << " expected_fid=0x" << std::setw(4) << expected_fid;
         throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
-                "check_owi");
+                         "check_owi");
     }
     if (rd._h_rid < h._rid)
         rd._h_rid = h._rid;
@@ -757,33 +757,33 @@
 void
 jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
 {
-//kpvdr TODO -- this list needs to be mutexed...???
+    //kpvdr TODO -- this list needs to be mutexed...???
     std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
-            journal->_aio_wr_cmpl_dtok_list.end());
+                                                       journal->_aio_wr_cmpl_dtok_list.end());
 	    
     journal->_aio_wr_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
         data_tok*& dtokp = this_dtok_list.front();
-		if (!journal->is_stopped() && dtokp->getSourceMessage())
-		{
-			switch (dtokp->wstate())
-			{
- 				case data_tok::ENQ:
-     	         	dtokp->getSourceMessage()->enqueueComplete();
- 					break;
-				case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
-     	        	dtokp->getSourceMessage()->dequeueComplete();
-		     		if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
-		         		dtokp->getSourceMessage()->setPersistenceId(0);
-*/
-					break;
-				default:
-					;
-			}
-		}
-		intrusive_ptr_release(dtokp);
+        if (!journal->is_stopped() && dtokp->getSourceMessage())
+        {
+            switch (dtokp->wstate())
+            {
+              case data_tok::ENQ:
+                dtokp->getSourceMessage()->enqueueComplete();
+                break;
+              case data_tok::DEQ:
+                /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+                   dtokp->getSourceMessage()->dequeueComplete();
+                   if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
+                   dtokp->getSourceMessage()->setPersistenceId(0);
+                */
+                break;
+              default:
+                ;
+            }
+        }
+        dtokp->release();
         this_dtok_list.pop_front();
     }
 }
@@ -792,21 +792,21 @@
 jcntl::aio_rd_callback(jcntl*  journal, u_int32_t num_dtoks)
 {
 
-//kpvdr TODO -- can we get rid of the copy???
+    //kpvdr TODO -- can we get rid of the copy???
     std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
-            journal->_aio_rd_cmpl_dtok_list.end());
+                                                       journal->_aio_rd_cmpl_dtok_list.end());
     journal->_aio_rd_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
         data_tok*& dtokp = this_dtok_list.front();
-		if (!journal->is_stopped() && dtokp->getSourceMessage())
-		{
-        	if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
-         	{
+        if (!journal->is_stopped() && dtokp->getSourceMessage())
+        {
+            if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+            {
                 // cct call the recovery manager. / lazyload.. 
-        	}
-		}
-	    intrusive_ptr_release( dtokp);
+            }
+        }
+        dtokp->release();
         this_dtok_list.pop_front();
     }
     




More information about the rhmessaging-commits mailing list