Author: ritchiem
Date: 2009-04-03 13:46:30 -0400 (Fri, 03 Apr 2009)
New Revision: 3261
Added:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
QPID-1764 : Created TestableBDBMessageStore to be in line with TestableMMS using the
TestTransactionLog interface. This will allow us to keep the testing code out of the main
TransactionLogs.
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-03
17:35:57 UTC (rev 3260)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-03
17:46:30 UTC (rev 3261)
@@ -86,52 +86,50 @@
*/
public class BDBMessageStore implements TransactionLog, RoutingTable
{
- private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+ protected static final Logger _log = Logger.getLogger(BDBMessageStore.class);
private static final int DATABASE_FORMAT_VERSION = 2;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
- private Environment _environment;
+ protected Environment _environment;
- private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
+ protected String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
/**
* Maps from messageId to an AMQMessage (note we don't use serialisation but this
is what it roughly corresponds
* to)
*/
- private Database _messageMetaDataDb;
+ protected Database _messageMetaDataDb;
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
+ protected String MESSAGECONTENTDB_NAME = "messageContentDb";
- private Database _messageContentDb;
+ protected Database _messageContentDb;
private String QUEUEDB_NAME = "queueDb";
/** Maps from name (which uniquely identifies a queue) to an AMQQueue */
- private Database _queueDb;
+ protected Database _queueDb;
- private String DELIVERYDB_NAME = "deliveryDb";
+ protected String DELIVERYDB_NAME = "deliveryDb";
/** Maps from a queue name to a message id. This is what stores the pending
deliveries for a given queue */
- private Database _deliveryDb;
+ protected Database _deliveryDb;
- private String EXCHANGEDB_NAME = "exchangeDb";
- private Database _exchangeDb;
+ protected String EXCHANGEDB_NAME = "exchangeDb";
+ protected Database _exchangeDb;
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private Database _queueBindingsDb;
+ protected String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+ protected Database _queueBindingsDb;
- private VirtualHost _virtualHost;
+ protected VirtualHost _virtualHost;
- private final AtomicLong _messageId = new AtomicLong(1);
+ protected final AtomicLong _queueId = new AtomicLong(1);
- private final AtomicLong _queueId = new AtomicLong(1);
-
private final CommitThread _commitThread = new
CommitThread("Commit-Thread");
- private Map<AMQShortString, Long> _queueNameToIdMap = new
ConcurrentHashMap<AMQShortString, Long>();
+ protected Map<AMQShortString, Long> _queueNameToIdMap = new
ConcurrentHashMap<AMQShortString, Long>();
// Factory Classes to create the TupleBinding objects that relfect the version
instance of this BDBStore
private QueueTupleBindingFactory _queueTupleBindingFactory;
@@ -1388,7 +1386,7 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
throws AMQException
+ protected MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
throws AMQException
{
if (_log.isDebugEnabled())
{
@@ -1692,8 +1690,6 @@
queue.enqueue(context, message);
}
-
- _messageId.set(maxId + 1);
}
catch (DatabaseException e)
{
Added:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java
(rev 0)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/TestableBDBMessageStore.java 2009-04-03
17:46:30 UTC (rev 3261)
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class TestableBDBMessageStore extends BDBMessageStore implements
TestTransactionLog
+{
+
+ public void setBaseTransactionLog(BaseTransactionLog base)
+ {
+ //no-op as we don't care what the base TransactionLog is.
+ }
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageID)
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _deliveryDb.openCursor(null, null);
+
+ DatabaseEntry key = new DatabaseEntry();
+
+ QueueEntryKey dd = new QueueEntryKey(null, messageID);
+
+ EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ keyBinding.objectToEntry(dd, key);
+
+ DatabaseEntry value = new DatabaseEntry();
+
+ List<AMQQueue> queues = new LinkedList<AMQQueue>();
+
+ OperationStatus status = cursor.getSearchKeyRange(key, value,
LockMode.DEFAULT);
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+ while ((status == OperationStatus.SUCCESS) && dd.messageId ==
messageID)
+ {
+ queues.add(_virtualHost.getQueueRegistry().getQueue(dd.queueName));
+ status = cursor.getNext(key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ }
+ }
+
+ return queues;
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException("Database error: " + e, e);
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException("Error closing cursor: " + e,
e);
+ }
+ }
+ }
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
throws AMQException
+ {
+ return super.getMessageMetaData(context, messageId);
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int
index) throws AMQException
+ {
+ return super.getContentBodyChunk(context, messageId, index);
+ }
+
+ public long getMessageMetaDataSize()
+ {
+ try
+ {
+ return _messageMetaDataDb.count();
+ }
+ catch (DatabaseException e)
+ {
+ throw new RuntimeException("Unable to get count of database");
+ }
+ }
+
+ public TransactionLog getDelegate()
+ {
+ return this;
+ }
+}