Author: ritchiem
Date: 2009-04-02 11:58:37 -0400 (Thu, 02 Apr 2009)
New Revision: 3250
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Update based on QPID-1764 to bring BDBMS in line with the new TransactionLog interface.
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-01
19:40:50 UTC (rev 3249)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-02
15:58:37 UTC (rev 3250)
@@ -54,6 +54,7 @@
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -130,10 +131,6 @@
private Map<AMQShortString, Long> _queueNameToIdMap = new
ConcurrentHashMap<AMQShortString, Long>();
- protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new
ConcurrentHashMap<Long, List<AMQQueue>>();
-
- private final Map<Transaction, Map<Long, List<AMQQueue>>>
_dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
-
// Factory Classes to create the TupleBinding objects that relfect the version
instance of this BDBStore
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
@@ -196,7 +193,7 @@
*
* @throws Exception If any error occurs that means the store is unable to configure
itself.
*/
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration
vHostConfig) throws Exception
+ public Object configure(VirtualHost virtualHost, String base,
VirtualHostConfiguration vHostConfig) throws Exception
{
Configuration config = vHostConfig.getStoreConfiguration();
File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY,
"bdbEnv"));
@@ -211,15 +208,15 @@
_version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
- configure(virtualHost, environmentPath, false);
+ return new BaseTransactionLog(configure(virtualHost, environmentPath, false));
}
- public void configure(File environmentPath) throws AMQException, DatabaseException
+ public BDBMessageStore configure(File environmentPath) throws AMQException,
DatabaseException
{
- configure(null, environmentPath, false);
+ return configure(null, environmentPath, false);
}
- public void configure(VirtualHost virtualHost, File environmentPath, boolean
readonly) throws AMQException, DatabaseException
+ public BDBMessageStore configure(VirtualHost virtualHost, File environmentPath,
boolean readonly) throws AMQException, DatabaseException
{
stateTransition(State.INITIAL, State.CONFIGURING);
@@ -256,7 +253,7 @@
{
stateTransition(State.CONFIGURED, State.STARTED);
}
-
+ return this;
}
/**
@@ -486,7 +483,7 @@
*
* @throws AMQException If the operation fails for any reason.
*/
- void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " +
context + ", Long messageId = " + messageId
// + "): called");
@@ -562,6 +559,8 @@
if (localTx)
{
+ // ? Will this not perform the environment default commit? Should we not
be doing this async as
+ // remove will only occur when message has been fully dequeued?
2009-03-31
tx.commit();
context.setPayload(null);
}
@@ -932,13 +931,21 @@
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The the queue to place the message on.
+ * @param queues The the List of queues to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId)
throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues,
Long messageId) throws AMQException
{
+ for (AMQQueue q : queues)
+ {
+ enqueueMessage(context, q, messageId);
+ }
+ }
+
+ void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws
AMQException
+ {
// _log.debug("public void enqueueMessage(StoreContext context = " +
context + ", AMQShortString name = " + name
// + ", Long messageId): called");
@@ -951,8 +958,6 @@
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
- recordEnqueue(messageId, queue);
-
try
{
_deliveryDb.put(tx, key, value);
@@ -977,6 +982,9 @@
{
try
{
+ // ? Why is this providing a Null TransactionConfig should we not be
using _transactionConfig
+ // or at least providing some config rather than the environment
default.
+ // What is the environment Default? : 2009-03-31
tx = _environment.beginTransaction(null, null);
_log.info("Creating local transaction:" + tx);
context.setPayload(tx);
@@ -1037,27 +1045,6 @@
}
- //Record the delete for processing AFTER the commit has taken place.
- synchronized (_dequeueTxMap)
- {
- Map<Long, List<AMQQueue>> transactionMap =
_dequeueTxMap.get(tx);
- if (transactionMap == null)
- {
- transactionMap = new HashMap<Long, List<AMQQueue>>();
- _dequeueTxMap.put(tx, transactionMap);
- }
-
- List<AMQQueue> queueList = transactionMap.get(messageId);
-
- if (queueList == null)
- {
- queueList = new LinkedList<AMQQueue>();
- transactionMap.put(messageId, queueList);
- }
-
- queueList.add(queue);
- }
-
if (isLocal)
{
commit(tx);
@@ -1080,13 +1067,6 @@
{
throw new AMQException("Error rolling back transaction: " +
e1, e1);
}
- finally
- {
- synchronized (_dequeueTxMap)
- {
- _dequeueTxMap.remove(tx);
- }
- }
}
throw new AMQException("Error accessing database while dequeuing
message: " + e, e);
@@ -1097,7 +1077,6 @@
* Begins a transactional context.
*
* @param context The transactional context to begin.
- *
* @throws AMQException If the operation fails for any reason.
*/
public void beginTran(StoreContext context) throws AMQException
@@ -1154,11 +1133,18 @@
try
{
- commit(tx);
+ if (context.isAsync())
+ {
+ tx.commitNoSync();
+ }
+ else
+ {
+ commit(tx);
+ }
if (_log.isDebugEnabled())
{
- _log.debug("commit tran completed");
+ _log.debug("commit tran Async(" + context.isAsync() + ")
completed");
}
}
catch (DatabaseException e)
@@ -1695,7 +1681,6 @@
}
- recordEnqueue(messageId, queue);
queue.enqueue(context, message);
}
@@ -1749,13 +1734,8 @@
tx.commitNoSync();
- Map<Long, List<AMQQueue>> dequeueMap = null;
- synchronized (_dequeueTxMap)
- {
- dequeueMap = _dequeueTxMap.remove(tx);
- }
- Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+ Commit commit = new Commit(_commitThread, tx, this);
commit.commit();
@@ -1774,19 +1754,14 @@
private final Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
- private Map<Long, List<AMQQueue>> _messageDequeueMap;
- private TransactionLog _transactionLog;
- public Commit(CommitThread commitThread, Transaction tx, Map<Long,
- List<AMQQueue>> messageDequeueMap, TransactionLog
transactionLog)
+ public Commit(CommitThread commitThread, Transaction tx, TransactionLog
transactionLog)
{
// _log.debug("public Commit(CommitThread commitThread = " +
commitThread + ", Transaction tx = " + tx
// + "): called");
_commitThread = commitThread;
_tx = tx;
- _messageDequeueMap = messageDequeueMap;
- _transactionLog = transactionLog;
}
public void prepare(boolean synch) throws DatabaseException
@@ -1819,22 +1794,6 @@
_complete = true;
- // If we have dequeuedMessages so update our internal state
- if (_messageDequeueMap != null)
- {
- _log.info("Transaction(" + _tx + ") Complete : Dequeuing
messages used.");
- StoreContext dequeueMessageContext = new StoreContext();
-
- for (Map.Entry<Long, List<AMQQueue>> entry :
_messageDequeueMap.entrySet())
- {
- Long id = entry.getKey();
- for (AMQQueue queue : entry.getValue())
- {
- ((BDBMessageStore)
_transactionLog).recordDequeue(dequeueMessageContext, id, queue);
- }
- }
- }
-
notify();
}
@@ -1855,6 +1814,8 @@
// _log.debug("public void commit(): called");
_commitThread.addJob(this);
+ //? Is it not possible that we could be notified here that the _commitThread
committed this job?
+ // Should we also sync around the addJob 2009-04-01 or synchronize this
method ?
synchronized (this)
{
while (!_complete)
@@ -1881,72 +1842,6 @@
}
/**
- * Record that the give message is enqueued on the specified queue.
- *
- * @param messageId The message id to enqueue
- * @param queue The queue it is enqueued upon.
- */
- private void recordEnqueue(Long messageId, AMQQueue queue)
- {
- List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
- if (queues == null)
- {
- queues = new LinkedList<AMQQueue>();
- }
-
- queues.add(queue);
-
- _messageOnQueueMap.put(messageId, queues);
- }
-
- /**
- * Update our records that the given message is nolonger on the specified queue.
- * If the message no longer has any queue references then we can discard the
content.
- *
- * @param context
- * @param messageId
- * @param queue
- */
- private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
- {
- _log.info("Dequeue Message(" + messageId + ") from queue(" +
queue.getName() + ") context=" + context);
- List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
- if (queues == null)
- {
- throw new RuntimeException("Error, Tried to dequeue a message that is
not enqueued");
- }
-
- if (queues.remove(queue))
- {
- // if we now have no more references to this message we can dispose of it
- if (queues.size() == 0)
- {
- try
- {
- _messageOnQueueMap.remove(messageId);
- _log.info("Removing Message(" + messageId + ") from
Tlog context=" + context);
-
- removeMessage(context, messageId);
- }
- catch (AMQException e)
- {
- //todo As we are jus swallowing exception need to add clean up in
load().
- // This should purge any message content that doesn't have any
delivery records.
- _log.debug("Error occured removing unreferenced message:" +
e.getMessage());
- }
-
- }
- }
- else
- {
- throw new RuntimeException("Error, Tried to dequeue a message from a
queue, upon which it is not enqueued");
- }
-
- }
-
- /**
* Implements a thread which batches and commits a queue of {@link Commit}
operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the
commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit
operations when they have been