[rhmessaging-commits] rhmessaging commits: r3261 - in store/trunk/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Apr 3 13:46:30 EDT 2009


Author: ritchiem
Date: 2009-04-03 13:46:30 -0400 (Fri, 03 Apr 2009)
New Revision: 3261

Added:
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java
Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
QPID-1764 : Created TestableBDBMessageStore to be in line with TestableMMS using the TestTransactionLog interface. This will allow us to keep the testing code out of the main TransactionLogs.

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-04-03 17:35:57 UTC (rev 3260)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-04-03 17:46:30 UTC (rev 3261)
@@ -86,52 +86,50 @@
  */
 public class BDBMessageStore implements TransactionLog, RoutingTable
 {
-    private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+    protected static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
     private static final int DATABASE_FORMAT_VERSION = 2;
     private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
 
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
 
-    private Environment _environment;
+    protected Environment _environment;
 
-    private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
+    protected String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
 
     /**
      * Maps from messageId to an AMQMessage (note we don't use serialisation but this is what it roughly corresponds
      * to)
      */
-    private Database _messageMetaDataDb;
+    protected Database _messageMetaDataDb;
 
-    private String MESSAGECONTENTDB_NAME = "messageContentDb";
+    protected String MESSAGECONTENTDB_NAME = "messageContentDb";
 
-    private Database _messageContentDb;
+    protected Database _messageContentDb;
 
     private String QUEUEDB_NAME = "queueDb";
 
     /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
-    private Database _queueDb;
+    protected Database _queueDb;
 
-    private String DELIVERYDB_NAME = "deliveryDb";
+    protected String DELIVERYDB_NAME = "deliveryDb";
 
     /** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
-    private Database _deliveryDb;
+    protected Database _deliveryDb;
 
-    private String EXCHANGEDB_NAME = "exchangeDb";
-    private Database _exchangeDb;
+    protected String EXCHANGEDB_NAME = "exchangeDb";
+    protected Database _exchangeDb;
 
-    private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
-    private Database _queueBindingsDb;
+    protected String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+    protected Database _queueBindingsDb;
 
-    private VirtualHost _virtualHost;
+    protected VirtualHost _virtualHost;
 
-    private final AtomicLong _messageId = new AtomicLong(1);
+    protected final AtomicLong _queueId = new AtomicLong(1);
 
-    private final AtomicLong _queueId = new AtomicLong(1);
-
     private final CommitThread _commitThread = new CommitThread("Commit-Thread");
 
-    private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
+    protected Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
 
     // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
     private QueueTupleBindingFactory _queueTupleBindingFactory;
@@ -1388,7 +1386,7 @@
      *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    protected MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
     {
         if (_log.isDebugEnabled())
         {
@@ -1692,8 +1690,6 @@
                 queue.enqueue(context, message);
 
             }
-
-            _messageId.set(maxId + 1);
         }
         catch (DatabaseException e)
         {

Added: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java	                        (rev 0)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java	2009-04-03 17:46:30 UTC (rev 3261)
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestableBDBMessageStore extends BDBMessageStore implements TestTransactionLog
+{
+
+    public void setBaseTransactionLog(BaseTransactionLog base)
+    {
+        //no-op as we don't care what the base TransactionLog is.
+    }
+
+    public List<AMQQueue> getMessageReferenceMap(Long messageID)
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = _deliveryDb.openCursor(null, null);
+
+            DatabaseEntry key = new DatabaseEntry();
+
+            QueueEntryKey dd = new QueueEntryKey(null, messageID);
+
+            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+            keyBinding.objectToEntry(dd, key);
+
+            DatabaseEntry value = new DatabaseEntry();
+
+            List<AMQQueue> queues = new LinkedList<AMQQueue>();
+
+            OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
+            dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+            while ((status == OperationStatus.SUCCESS) && dd.messageId == messageID)
+            {
+                queues.add(_virtualHost.getQueueRegistry().getQueue(dd.queueName));
+                status = cursor.getNext(key, value, LockMode.DEFAULT);
+                if (status == OperationStatus.SUCCESS)
+                {
+                    dd = (QueueEntryKey) keyBinding.entryToObject(key);
+                }
+            }
+
+            return queues;
+        }
+        catch (DatabaseException e)
+        {
+            throw new RuntimeException("Database error: " + e, e);
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch (DatabaseException e)
+                {
+                    throw new RuntimeException("Error closing cursor: " + e, e);
+                }
+            }
+        }
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        return super.getMessageMetaData(context, messageId);
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        return super.getContentBodyChunk(context, messageId, index);
+    }
+
+    public long getMessageMetaDataSize()
+    {
+        try
+        {
+            return _messageMetaDataDb.count();
+        }
+        catch (DatabaseException e)
+        {
+            throw new RuntimeException("Unable to get count of database");
+        }
+    }
+
+    public TransactionLog getDelegate()
+    {
+        return this;
+    }
+}




More information about the rhmessaging-commits mailing list