[rhmessaging-commits] rhmessaging commits: r3250 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Apr 2 11:58:37 EDT 2009


Author: ritchiem
Date: 2009-04-02 11:58:37 -0400 (Thu, 02 Apr 2009)
New Revision: 3250

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Update based on QPID-1764 to bring BDBMS in line with the new TransactionLog interface.

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-04-01 19:40:50 UTC (rev 3249)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-04-02 15:58:37 UTC (rev 3250)
@@ -54,6 +54,7 @@
 import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
 import org.apache.qpid.server.transactionlog.TransactionLog;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -130,10 +131,6 @@
 
     private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
 
-    protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new ConcurrentHashMap<Long, List<AMQQueue>>();
-
-    private final Map<Transaction, Map<Long, List<AMQQueue>>> _dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
-
     // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
     private QueueTupleBindingFactory _queueTupleBindingFactory;
     private BindingTupleBindingFactory _bindingTupleBindingFactory;
@@ -196,7 +193,7 @@
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
     {
         Configuration config = vHostConfig.getStoreConfiguration();
         File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
@@ -211,15 +208,15 @@
 
         _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
 
-        configure(virtualHost, environmentPath, false);
+        return new BaseTransactionLog(configure(virtualHost, environmentPath, false));
     }
 
-    public void configure(File environmentPath) throws AMQException, DatabaseException
+    public BDBMessageStore configure(File environmentPath) throws AMQException, DatabaseException
     {
-        configure(null, environmentPath, false);
+        return configure(null, environmentPath, false);
     }
 
-    public void configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
+    public BDBMessageStore configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
     {
         stateTransition(State.INITIAL, State.CONFIGURING);
 
@@ -256,7 +253,7 @@
         {
             stateTransition(State.CONFIGURED, State.STARTED);
         }
-
+       return this;
     }
 
     /**
@@ -486,7 +483,7 @@
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    void removeMessage(StoreContext context, Long messageId) throws AMQException
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
     {
         // _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
         // + "): called");
@@ -562,6 +559,8 @@
 
             if (localTx)
             {
+                // ? Will this not perform the environment default commit? Should we not be doing this async as
+                // remove will only occur when message has been fully dequeued? 2009-03-31
                 tx.commit();
                 context.setPayload(null);
             }
@@ -932,13 +931,21 @@
      * Places a message onto a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param queue     The the queue to place the message on.
+     * @param queues     The the List of queues to place the message on.
      * @param messageId The message to enqueue.
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
+        for (AMQQueue q : queues)
+        {
+            enqueueMessage(context, q, messageId);
+        }
+    }
+
+    void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+    {
         // _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
         // + ", Long messageId): called");
 
@@ -951,8 +958,6 @@
         DatabaseEntry value = new DatabaseEntry();
         ByteBinding.byteToEntry((byte) 0, value);
 
-        recordEnqueue(messageId, queue);
-
         try
         {
             _deliveryDb.put(tx, key, value);
@@ -977,6 +982,9 @@
         {
             try
             {
+                // ? Why is this providing a Null TransactionConfig should we not be using _transactionConfig
+                // or at least providing some config rather than the environment default.
+                // What is the environment Default? : 2009-03-31
                 tx = _environment.beginTransaction(null, null);
                 _log.info("Creating local transaction:" + tx);
                 context.setPayload(tx);
@@ -1037,27 +1045,6 @@
 
             }
 
-            //Record the delete for processing AFTER the commit has taken place.
-            synchronized (_dequeueTxMap)
-            {
-                Map<Long, List<AMQQueue>> transactionMap = _dequeueTxMap.get(tx);
-                if (transactionMap == null)
-                {
-                    transactionMap = new HashMap<Long, List<AMQQueue>>();
-                    _dequeueTxMap.put(tx, transactionMap);
-                }
-
-                List<AMQQueue> queueList = transactionMap.get(messageId);
-
-                if (queueList == null)
-                {
-                    queueList = new LinkedList<AMQQueue>();
-                    transactionMap.put(messageId, queueList);
-                }
-
-                queueList.add(queue);
-            }
-
             if (isLocal)
             {
                 commit(tx);
@@ -1080,13 +1067,6 @@
                 {
                     throw new AMQException("Error rolling back transaction: " + e1, e1);
                 }
-                finally
-                {
-                    synchronized (_dequeueTxMap)
-                    {
-                        _dequeueTxMap.remove(tx);
-                    }
-                }
             }
 
             throw new AMQException("Error accessing database while dequeuing message: " + e, e);
@@ -1097,7 +1077,6 @@
      * Begins a transactional context.
      *
      * @param context The transactional context to begin.
-     *
      * @throws AMQException If the operation fails for any reason.
      */
     public void beginTran(StoreContext context) throws AMQException
@@ -1154,11 +1133,18 @@
 
         try
         {
-            commit(tx);
+            if (context.isAsync())
+            {
+                tx.commitNoSync();
+            }
+            else
+            {
+                commit(tx);
+            }
 
             if (_log.isDebugEnabled())
             {
-                _log.debug("commit tran completed");
+                _log.debug("commit tran Async(" + context.isAsync() + ") completed");
             }
         }
         catch (DatabaseException e)
@@ -1695,7 +1681,6 @@
 
                 }
 
-                recordEnqueue(messageId, queue);
                 queue.enqueue(context, message);
 
             }
@@ -1749,13 +1734,8 @@
 
         tx.commitNoSync();
 
-        Map<Long, List<AMQQueue>> dequeueMap = null;
-        synchronized (_dequeueTxMap)
-        {
-            dequeueMap = _dequeueTxMap.remove(tx);
-        }
 
-        Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+        Commit commit = new Commit(_commitThread, tx, this);
 
         commit.commit();
 
@@ -1774,19 +1754,14 @@
         private final Transaction _tx;
         private DatabaseException _databaseException;
         private boolean _complete;
-        private Map<Long, List<AMQQueue>> _messageDequeueMap;
-        private TransactionLog _transactionLog;
 
-        public Commit(CommitThread commitThread, Transaction tx, Map<Long,
-                List<AMQQueue>> messageDequeueMap, TransactionLog transactionLog)
+        public Commit(CommitThread commitThread, Transaction tx, TransactionLog transactionLog)
         {
             // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
             // + "): called");
 
             _commitThread = commitThread;
             _tx = tx;
-            _messageDequeueMap = messageDequeueMap;
-            _transactionLog = transactionLog;
         }
 
         public void prepare(boolean synch) throws DatabaseException
@@ -1819,22 +1794,6 @@
 
             _complete = true;
 
-            // If we have dequeuedMessages so update our internal state
-            if (_messageDequeueMap != null)
-            {
-                _log.info("Transaction(" + _tx + ") Complete : Dequeuing messages used.");
-                StoreContext dequeueMessageContext = new StoreContext();
-
-                for (Map.Entry<Long, List<AMQQueue>> entry : _messageDequeueMap.entrySet())
-                {
-                    Long id = entry.getKey();
-                    for (AMQQueue queue : entry.getValue())
-                    {
-                        ((BDBMessageStore) _transactionLog).recordDequeue(dequeueMessageContext, id, queue);
-                    }
-                }
-            }
-
             notify();
 
         }
@@ -1855,6 +1814,8 @@
             // _log.debug("public void commit(): called");
 
             _commitThread.addJob(this);
+            //? Is it not possible that we could be notified here that the _commitThread committed this job?
+            // Should we also sync around the addJob 2009-04-01 or synchronize this method ? 
             synchronized (this)
             {
                 while (!_complete)
@@ -1881,72 +1842,6 @@
     }
 
     /**
-     * Record that the give message is enqueued on the specified queue.
-     *
-     * @param messageId The message id to enqueue
-     * @param queue     The queue it is enqueued upon.
-     */
-    private void recordEnqueue(Long messageId, AMQQueue queue)
-    {
-        List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
-        if (queues == null)
-        {
-            queues = new LinkedList<AMQQueue>();
-        }
-
-        queues.add(queue);
-
-        _messageOnQueueMap.put(messageId, queues);
-    }
-
-    /**
-     * Update our records that the given message is nolonger on the specified queue.
-     * If the message no longer has any queue references then we can discard the content.
-     *
-     * @param context
-     * @param messageId
-     * @param queue
-     */
-    private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
-    {
-        _log.info("Dequeue Message(" + messageId + ") from queue(" + queue.getName() + ") context=" + context);
-        List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
-        if (queues == null)
-        {
-            throw new RuntimeException("Error, Tried to dequeue a message that is not enqueued");
-        }
-
-        if (queues.remove(queue))
-        {
-            // if we now have no more references to this message we can dispose of it
-            if (queues.size() == 0)
-            {
-                try
-                {
-                    _messageOnQueueMap.remove(messageId);
-                    _log.info("Removing Message(" + messageId + ") from Tlog context=" + context);
-
-                    removeMessage(context, messageId);
-                }
-                catch (AMQException e)
-                {
-                    //todo As we are jus swallowing exception need to add clean up in load().
-                    // This should purge any message content that doesn't have any delivery records.
-                    _log.debug("Error occured removing unreferenced message:" + e.getMessage());
-                }
-
-            }
-        }
-        else
-        {
-            throw new RuntimeException("Error, Tried to dequeue a message from a queue, upon which it is not enqueued");
-        }
-
-    }
-
-    /**
      * Implements a thread which batches and commits a queue of {@link Commit} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations when they have been




More information about the rhmessaging-commits mailing list