[rhmessaging-commits] rhmessaging commits: r4191 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Aug 6 10:29:11 EDT 2010


Author: rgemmell
Date: 2010-08-06 10:29:09 -0400 (Fri, 06 Aug 2010)
New Revision: 4191

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Remove use of StoreContext to wrap the BDB transaction


Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-08-06 14:28:17 UTC (rev 4190)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-08-06 14:29:09 UTC (rev 4191)
@@ -49,7 +49,6 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.StoredMemoryMessage;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
@@ -780,8 +779,7 @@
      */
     public void removeMessage(Long messageId) throws AMQStoreException
     {
-        // _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
-        // + "): called");
+        // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
 
         com.sleepycat.je.Transaction tx = null;
         
@@ -1162,21 +1160,19 @@
     }
 
     /**
-     * Places a message onto a specified queue, in a given transactional context.
+     * Places a message onto a specified queue, in a given transaction.
      *
-     * @param context   The transactional context for the operation.
+     * @param tx   The transaction for the operation.
      * @param queue     The the queue to place the message on.
      * @param messageId The message to enqueue.
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
     {
-        // _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
-        // + ", Long messageId): called");
+        // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
 
         AMQShortString name = new AMQShortString(queue.getResourceName());
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
         
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new QueueEntryTB();
@@ -1202,20 +1198,18 @@
     }
 
     /**
-     * Extracts a message from a specified queue, in a given transactional context.
+     * Extracts a message from a specified queue, in a given transaction.
      *
-     * @param context   The transactional context for the operation.
+     * @param tx   The transaction for the operation.
      * @param queue     The name queue to take the message from.
      * @param messageId The message to dequeue.
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void dequeueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, Long messageId) throws AMQStoreException
     {
         AMQShortString name = new AMQShortString(queue.getResourceName());
 
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new QueueEntryTB();
         QueueEntryKey dd = new QueueEntryKey(name, messageId);
@@ -1257,16 +1251,14 @@
     }
 
     /**
-     * Commits all operations performed within a given transactional context.
+     * Commits all operations performed within a given transaction.
      *
-     * @param context The transactional context to commit all operations for.
+     * @param tx The transaction to commit all operations for.
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws AMQStoreException
+    private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
     {
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
         //if (_log.isDebugEnabled())
         //{
         //    _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
@@ -1274,7 +1266,7 @@
         
         if (tx == null)
         {
-            throw new AMQStoreException("Fatal internal error: transactional context is empty at commitTran");
+            throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
         }
         
         StoreFuture result;
@@ -1291,25 +1283,19 @@
         {
             throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
         }
-        finally
-        {
-            context.setPayload(null);
-        }
         
         return result;
     }
 
     /**
-     * Abandons all operations performed within a given transactional context.
+     * Abandons all operations performed within a given transaction.
      *
-     * @param context The transactional context to abandon.
+     * @param tx The transaction to abandon.
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    public void abortTran(StoreContext context) throws AMQStoreException
+    public void abortTran(final com.sleepycat.je.Transaction tx) throws AMQStoreException
     {
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-
         if (_log.isDebugEnabled())
         {
             _log.debug("abortTran called for [Transaction:" + tx + "]");
@@ -1323,10 +1309,6 @@
         {
             throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
         }
-        finally
-        {
-            context.setPayload(null);
-        }
     }
 
     /**
@@ -1403,19 +1385,16 @@
     /**
      * Stores a chunk of message data.
      *
-     * @param context         The transactional context for the operation.
+     * @param tx         The transaction for the operation.
      * @param messageId       The message to store the data for.
      * @param offset          The offset of the data chunk in the message.
      * @param contentBody     The content of the data chunk.
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    protected void addContent(StoreContext context, Long messageId, int offset, 
+    protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int offset, 
                                       ByteBuffer contentBody) throws AMQStoreException
     {
-
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-        
         DatabaseEntry key = new DatabaseEntry();
         TupleBinding keyBinding = new MessageContentKeyTB_3();
         keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
@@ -1445,23 +1424,21 @@
     /**
      * Stores message meta-data.
      *
-     * @param context         The transactional context for the operation.
+     * @param tx         The transaction for the operation.
      * @param messageId       The message to store the data for.
      * @param messageMetaData The message meta data to store.
      *
      * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
      */
-    private void storeMetaData(StoreContext context, Long messageId, StorableMessageMetaData messageMetaData)
+    private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId, StorableMessageMetaData messageMetaData)
             throws AMQStoreException
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+            _log.debug("public void storeMetaData(Txn tx = " + tx + ", Long messageId = "
                        + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
         }
 
-        com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
-        
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
@@ -1975,7 +1952,6 @@
 
         private final long _messageId;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-        private StoreContext _ctx;
         private com.sleepycat.je.Transaction _txn;
 
         StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
@@ -1994,10 +1970,8 @@
                 _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
                 if(persist)
                 {
-                    _ctx = new StoreContext();
                     _txn = _environment.beginTransaction(null, null);
-                    _ctx.setPayload(_txn);
-                    storeMetaData(_ctx, messageId, metaData);
+                    storeMetaData(_txn, messageId, metaData);
                 }
             }
             catch (DatabaseException e)
@@ -2039,7 +2013,7 @@
         {
             try
             {
-                BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+                BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
             }
             catch (AMQStoreException e)
             {
@@ -2063,13 +2037,13 @@
         {
             try
             {
-                if(_ctx != null)
+                if(_txn != null)
                 {
                     //if(_log.isDebugEnabled())
                     //{
                     //   _log.debug("Flushing message " + _messageId + " to store");
                     //}
-                    BDBMessageStore.this.commitTranImpl(_ctx, true);
+                    BDBMessageStore.this.commitTranImpl(_txn, true);
                 }
             }
             catch (AMQStoreException e)
@@ -2079,7 +2053,6 @@
             finally
             {
                 _txn = null;
-                _ctx = null;
             }
             return IMMEDIATE_FUTURE;
         }
@@ -2101,11 +2074,9 @@
     private class BDBTransaction implements Transaction
     {
         private com.sleepycat.je.Transaction _txn;
-        private StoreContext _ctx;
 
         private BDBTransaction()
         {
-            _ctx = new StoreContext();
             try
             {
                 _txn = _environment.beginTransaction(null, null);
@@ -2114,33 +2085,32 @@
             {
                 throw new RuntimeException(e);
             }
-            _ctx.setPayload(_txn);
         }
 
         public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
         {
-            BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+            BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
         }
 
         public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
         {
-            BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+            BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
 
         }
 
         public void commitTran() throws AMQStoreException
         {
-            BDBMessageStore.this.commitTranImpl(_ctx, true);
+            BDBMessageStore.this.commitTranImpl(_txn, true);
         }
 
         public StoreFuture commitTranAsync() throws AMQStoreException
         {
-            return BDBMessageStore.this.commitTranImpl(_ctx, false);
+            return BDBMessageStore.this.commitTranImpl(_txn, false);
         }
 
         public void abortTran() throws AMQStoreException
         {
-            BDBMessageStore.this.abortTran(_ctx);
+            BDBMessageStore.this.abortTran(_txn);
         }
     }
 



More information about the rhmessaging-commits mailing list