Author: rgemmell
Date: 2010-08-06 10:29:09 -0400 (Fri, 06 Aug 2010)
New Revision: 4191
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Remove use of StoreContext to wrap the BDB transaction
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06
14:28:17 UTC (rev 4190)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-08-06
14:29:09 UTC (rev 4191)
@@ -49,7 +49,6 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMemoryMessage;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
@@ -780,8 +779,7 @@
*/
public void removeMessage(Long messageId) throws AMQStoreException
{
- // _log.debug("public void removeMessage(StoreContext context = " +
context + ", Long messageId = " + messageId
- // + "): called");
+ // _log.debug("public void removeMessage(Long messageId = " +
messageId): called");
com.sleepycat.je.Transaction tx = null;
@@ -1162,21 +1160,19 @@
}
/**
- * Places a message onto a specified queue, in a given transactional context.
+ * Places a message onto a specified queue, in a given transaction.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, final TransactionLogResource queue,
Long messageId) throws AMQStoreException
+ public void enqueueMessage(final com.sleepycat.je.Transaction tx, final
TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- // _log.debug("public void enqueueMessage(StoreContext context = " +
context + ", AMQShortString name = " + name
- // + ", Long messageId): called");
+ // _log.debug("public void enqueueMessage(Transaction tx = " + tx +
", AMQShortString name = " + name + ", Long messageId): called");
AMQShortString name = new AMQShortString(queue.getResourceName());
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
@@ -1202,20 +1198,18 @@
}
/**
- * Extracts a message from a specified queue, in a given transactional context.
+ * Extracts a message from a specified queue, in a given transaction.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws AMQStoreException If the operation fails for any reason, or if the
specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, final TransactionLogResource queue,
Long messageId) throws AMQStoreException
+ public void dequeueMessage(final com.sleepycat.je.Transaction tx, final
TransactionLogResource queue, Long messageId) throws AMQStoreException
{
AMQShortString name = new AMQShortString(queue.getResourceName());
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
@@ -1257,16 +1251,14 @@
}
/**
- * Commits all operations performed within a given transactional context.
+ * Commits all operations performed within a given transaction.
*
- * @param context The transactional context to commit all operations for.
+ * @param tx The transaction to commit all operations for.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(StoreContext context, boolean syncCommit) throws
AMQStoreException
+ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean
syncCommit) throws AMQStoreException
{
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
-
//if (_log.isDebugEnabled())
//{
// _log.debug("public void commitTranImpl() called with
(Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
@@ -1274,7 +1266,7 @@
if (tx == null)
{
- throw new AMQStoreException("Fatal internal error: transactional context
is empty at commitTran");
+ throw new AMQStoreException("Fatal internal error: transactional is null
at commitTran");
}
StoreFuture result;
@@ -1291,25 +1283,19 @@
{
throw new AMQStoreException("Error commit tx: " + e.getMessage(),
e);
}
- finally
- {
- context.setPayload(null);
- }
return result;
}
/**
- * Abandons all operations performed within a given transactional context.
+ * Abandons all operations performed within a given transaction.
*
- * @param context The transactional context to abandon.
+ * @param tx The transaction to abandon.
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- public void abortTran(StoreContext context) throws AMQStoreException
+ public void abortTran(final com.sleepycat.je.Transaction tx) throws
AMQStoreException
{
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
-
if (_log.isDebugEnabled())
{
_log.debug("abortTran called for [Transaction:" + tx +
"]");
@@ -1323,10 +1309,6 @@
{
throw new AMQStoreException("Error aborting transaction: " +
e.getMessage(), e);
}
- finally
- {
- context.setPayload(null);
- }
}
/**
@@ -1403,19 +1385,16 @@
/**
* Stores a chunk of message data.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws AMQStoreException If the operation fails for any reason, or if the
specified message does not exist.
*/
- protected void addContent(StoreContext context, Long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, Long messageId, int
offset,
ByteBuffer contentBody) throws AMQStoreException
{
-
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
TupleBinding keyBinding = new MessageContentKeyTB_3();
keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
@@ -1445,23 +1424,21 @@
/**
* Stores message meta-data.
*
- * @param context The transactional context for the operation.
+ * @param tx The transaction for the operation.
* @param messageId The message to store the data for.
* @param messageMetaData The message meta data to store.
*
* @throws AMQStoreException If the operation fails for any reason, or if the
specified message does not exist.
*/
- private void storeMetaData(StoreContext context, Long messageId,
StorableMessageMetaData messageMetaData)
+ private void storeMetaData(final com.sleepycat.je.Transaction tx, Long messageId,
StorableMessageMetaData messageMetaData)
throws AMQStoreException
{
if (_log.isDebugEnabled())
{
- _log.debug("public void storeMessageMetaData(StoreContext context =
" + context + ", Long messageId = "
+ _log.debug("public void storeMetaData(Txn tx = " + tx + ",
Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " +
messageMetaData + "): called");
}
- com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
@@ -1975,7 +1952,6 @@
private final long _messageId;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private StoreContext _ctx;
private com.sleepycat.je.Transaction _txn;
StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
@@ -1994,10 +1970,8 @@
_metaDataRef = new
SoftReference<StorableMessageMetaData>(metaData);
if(persist)
{
- _ctx = new StoreContext();
_txn = _environment.beginTransaction(null, null);
- _ctx.setPayload(_txn);
- storeMetaData(_ctx, messageId, metaData);
+ storeMetaData(_txn, messageId, metaData);
}
}
catch (DatabaseException e)
@@ -2039,7 +2013,7 @@
{
try
{
- BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+ BDBMessageStore.this.addContent(_txn, _messageId, offsetInMessage, src);
}
catch (AMQStoreException e)
{
@@ -2063,13 +2037,13 @@
{
try
{
- if(_ctx != null)
+ if(_txn != null)
{
//if(_log.isDebugEnabled())
//{
// _log.debug("Flushing message " + _messageId + "
to store");
//}
- BDBMessageStore.this.commitTranImpl(_ctx, true);
+ BDBMessageStore.this.commitTranImpl(_txn, true);
}
}
catch (AMQStoreException e)
@@ -2079,7 +2053,6 @@
finally
{
_txn = null;
- _ctx = null;
}
return IMMEDIATE_FUTURE;
}
@@ -2101,11 +2074,9 @@
private class BDBTransaction implements Transaction
{
private com.sleepycat.je.Transaction _txn;
- private StoreContext _ctx;
private BDBTransaction()
{
- _ctx = new StoreContext();
try
{
_txn = _environment.beginTransaction(null, null);
@@ -2114,33 +2085,32 @@
{
throw new RuntimeException(e);
}
- _ctx.setPayload(_txn);
}
public void enqueueMessage(TransactionLogResource queue, Long messageId) throws
AMQStoreException
{
- BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+ BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
}
public void dequeueMessage(TransactionLogResource queue, Long messageId) throws
AMQStoreException
{
- BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+ BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
}
public void commitTran() throws AMQStoreException
{
- BDBMessageStore.this.commitTranImpl(_ctx, true);
+ BDBMessageStore.this.commitTranImpl(_txn, true);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return BDBMessageStore.this.commitTranImpl(_ctx, false);
+ return BDBMessageStore.this.commitTranImpl(_txn, false);
}
public void abortTran() throws AMQStoreException
{
- BDBMessageStore.this.abortTran(_ctx);
+ BDBMessageStore.this.abortTran(_txn);
}
}