[rhmessaging-commits] rhmessaging commits: r4018 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Jun 9 12:30:32 EDT 2010


Author: rgemmell
Date: 2010-06-09 12:30:31 -0400 (Wed, 09 Jun 2010)
New Revision: 4018

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Gather list of QueueEntry keys to be recovered and then pass close cursor before passing to the recovery handler, instead of recovering key by key.
This prevents DeadlockException when trying to acquire locks to remove invalid entries. Implement asynchronous transaction commit. Tweak debug logging.


Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-08 19:11:00 UTC (rev 4017)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-09 16:30:31 UTC (rev 4018)
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -696,6 +697,8 @@
     {
         QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
 
+        ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+        
         Cursor cursor = null;
         try
         {
@@ -705,21 +708,27 @@
 
             DatabaseEntry value = new DatabaseEntry();
 
-            OperationStatus status = cursor.getNext(key, value, LockMode.RMW);
-            
-            while (status == OperationStatus.SUCCESS)
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
+                QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
 
-                QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
+                entries.add(qek);
+            }
 
-                AMQShortString queueName = dd.getQueueName();
-                long messageId = dd.getMessageId();
+            try
+            {
+                cursor.close();
+            }
+            finally
+            {
+                cursor = null;
+            }
+            
+            for(QueueEntryKey entry : entries)
+            {
+                AMQShortString queueName = entry.getQueueName();
+                long messageId = entry.getMessageId();
                 
-                //Advance the cursor BEFORE passing the previous entry to the
-                //recovery handler. This is required in order to release the
-                //lock on the record in case the handler decides to remove it.
-                status = cursor.getNext(key, value, LockMode.RMW);
-                
                 qerh.queueEntry(queueName.asString(),messageId);
             }
         }
@@ -766,7 +775,7 @@
 
             if (_log.isDebugEnabled())
             {
-                _log.debug("Message Id: " + messageId + " Removing");
+                _log.debug("Removing message id " + messageId);
             }
 
             
@@ -833,7 +842,7 @@
             cursor.close();
             cursor = null;
             
-            commit(tx);
+            commit(tx, true);
         }
         catch (DatabaseException e)
         {
@@ -1030,8 +1039,11 @@
      */
     public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
-        _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
-
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
+        }
+        
         if (_state != State.RECOVERING)
         {
             QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(), 
@@ -1067,8 +1079,11 @@
     {
         AMQShortString name = queue.getNameShortString();
 
-        _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
-
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+        }
+            
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new AMQShortStringTB();
         keyBinding.objectToEntry(name, key);
@@ -1149,7 +1164,7 @@
 
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message Id: " + messageId + " Dequeue");
+            _log.debug("Dequeue message id " + messageId);
         }
         
         try
@@ -1188,27 +1203,28 @@
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void commitTran(StoreContext context) throws AMQException
+    private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws AMQException
     {
         com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
 
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("public void commitTran(StoreContext context = " + context + "): called with [Transaction" + tx + "]");
-        }
+        //if (_log.isDebugEnabled())
+        //{
+        //    _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
+        //}
         
         if (tx == null)
         {
             throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
         }
-
+        
+        StoreFuture result;
         try
         {
-            commit(tx);
+            result = commit(tx, syncCommit);
 
             if (_log.isDebugEnabled())
             {
-                _log.debug("commit tran completed");
+                _log.debug("commitTranImpl completed for [Transaction:" + tx + "]");
             }
         }
         catch (DatabaseException e)
@@ -1219,15 +1235,10 @@
         {
             context.setPayload(null);
         }
+        
+        return result;
     }
 
-    public StoreFuture commitTranAsync(StoreContext context) throws AMQException
-    {
-        //TODO - Actually create an async commit implementation.
-        commitTran(context);
-        return IMMEDIATE_FUTURE;
-    }
-
     /**
      * Abandons all operations performed within a given transactional context.
      *
@@ -1241,7 +1252,7 @@
 
         if (_log.isDebugEnabled())
         {
-            _log.debug("abort tran called: " + tx);
+            _log.debug("abortTran called for [Transaction:" + tx + "]");
         }
 
         try
@@ -1686,15 +1697,16 @@
         }
     }
 
-    private void commit(com.sleepycat.je.Transaction tx) throws DatabaseException
+    private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
     {
-        // _log.debug("void commit(Transaction tx = " + tx + "): called");
+        // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called");
 
         tx.commitNoSync();
 
-        Commit commit = new Commit(_commitThread, tx);
-        commit.commit();
-
+        BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+        commitFuture.commit();
+        
+        return commitFuture;
     }
 
     public void startCommitThread()
@@ -1702,55 +1714,36 @@
         _commitThread.start();
     }
 
-    private static final class Commit
+    private static final class BDBCommitFuture implements StoreFuture
     {
-        // private static final Logger _log = Logger.getLogger(Commit.class);
+        // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class);
 
         private final CommitThread _commitThread;
         private final com.sleepycat.je.Transaction _tx;
         private DatabaseException _databaseException;
         private boolean _complete;
+        private boolean _syncCommit;
 
-        public Commit(CommitThread commitThread, com.sleepycat.je.Transaction tx)
+        public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
         {
             // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
             // + "): called");
 
             _commitThread = commitThread;
             _tx = tx;
+            _syncCommit = syncCommit;
         }
 
-        public void prepare(boolean synch) throws DatabaseException
+        public synchronized void complete()
         {
-            // _log.debug("public void prepare(boolean synch = " + synch + "): called");
-
-            try
+            if (_log.isDebugEnabled())
             {
-                if (synch)
-                {
-                    // _log.debug("CommitSynch transaction: " + _tx);
-                    _tx.commitSync();
-                }
-                else
-                {
-                    // _log.debug("CommitNoSynch transaction: " + _tx);
-                    _tx.commitNoSync();
-                }
+                _log.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
             }
-            catch (DatabaseException e)
-            {
-                _databaseException = e;
-                throw e;
-            }
-        }
 
-        public synchronized void complete()
-        {
-            // _log.debug("public synchronized void complete(): called");
-
             _complete = true;
 
-            notify();
+            notifyAll();
         }
 
         public synchronized void abort(DatabaseException databaseException)
@@ -1761,16 +1754,23 @@
             _complete = true;
             _databaseException = databaseException;
 
-            notify();
+            notifyAll();
         }
 
         public void commit() throws DatabaseException
         {
-            // _log.debug("public void commit(): called");
+            //_log.debug("public void commit(): called");
 
             _commitThread.addJob(this);
-            synchronized (this)
+            
+            if(!_syncCommit)
             {
+                _log.debug("CommitAsync was requested, returning immediately.");
+                return;
+            }
+            
+            synchronized (BDBCommitFuture.this)
+            {
                 while (!_complete)
                 {
                     try
@@ -1792,13 +1792,34 @@
                 }
             }
         }
+
+        public synchronized boolean isComplete()
+        {
+            return _complete;
+        }
+
+        public void waitForCompletion()
+        {
+            while (!isComplete())
+            {
+                try
+                {
+                    wait(250);
+                }
+                catch (InterruptedException e)
+                {
+                    //TODO Should we ignore, or throw a 'StoreException'?
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
     /**
-     * Implements a thread which batches and commits a queue of {@link Commit} operations. The commit operations
+     * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
-     * completed by calling back on their {@link Commit#complete()} and {@link Commit#abort} methods.
+     * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
      *
      * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collarations </table>
      */
@@ -1807,7 +1828,7 @@
         // private final Logger _log = Logger.getLogger(CommitThread.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final AtomicReference<Queue<Commit>> _jobQueue = new AtomicReference<Queue<Commit>>(new ConcurrentLinkedQueue<Commit>());
+        private final AtomicReference<Queue<BDBCommitFuture>> _jobQueue = new AtomicReference<Queue<BDBCommitFuture>>(new ConcurrentLinkedQueue<BDBCommitFuture>());
         private final CheckpointConfig _config = new CheckpointConfig();
         private final Object _lock = new Object();
 
@@ -1848,21 +1869,21 @@
 
             // we replace the old queue atomically with a new one and this avoids any need to
             // copy elements out of the queue
-            Queue<Commit> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<Commit>());
+            Queue<BDBCommitFuture> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<BDBCommitFuture>());
 
             try
             {
                 // _environment.checkpoint(_config);
                 _environment.sync();
 
-                for (Commit commit : jobs)
+                for (BDBCommitFuture commit : jobs)
                 {
                     commit.complete();
                 }
             }
             catch (DatabaseException e)
             {
-                for (Commit commit : jobs)
+                for (BDBCommitFuture commit : jobs)
                 {
                     commit.abort(e);
                 }
@@ -1875,7 +1896,7 @@
             return !_jobQueue.get().isEmpty();
         }
 
-        public void addJob(Commit commit)
+        public void addJob(BDBCommitFuture commit)
         {
             synchronized (_lock)
             {
@@ -1995,7 +2016,11 @@
             {
                 if(_ctx != null)
                 {
-                    BDBMessageStore.this.commitTran(_ctx);
+                    //if(_log.isDebugEnabled())
+                    //{
+                    //   _log.debug("Flushing message " + _messageId + " to store");
+                    //}
+                    BDBMessageStore.this.commitTranImpl(_ctx, true);
                 }
             }
             catch (AMQException e)
@@ -2059,12 +2084,12 @@
 
         public void commitTran() throws AMQException
         {
-            BDBMessageStore.this.commitTran(_ctx);
+            BDBMessageStore.this.commitTranImpl(_ctx, true);
         }
 
         public StoreFuture commitTranAsync() throws AMQException
         {
-            return BDBMessageStore.this.commitTranAsync(_ctx);
+            return BDBMessageStore.this.commitTranImpl(_ctx, false);
         }
 
         public void abortTran() throws AMQException



More information about the rhmessaging-commits mailing list