Author: ritchiem
Date: 2009-04-02 12:52:13 -0400 (Thu, 02 Apr 2009)
New Revision: 3251
Removed:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
Modified:
store/trunk/java/bdbstore/build.xml
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
Log:
Update based on QPID-1764 to correct tests and ensure testing can be carried out
correctly
Modified: store/trunk/java/bdbstore/build.xml
===================================================================
--- store/trunk/java/bdbstore/build.xml 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/build.xml 2009-04-02 16:52:13 UTC (rev 3251)
@@ -17,6 +17,8 @@
<property name="java.target" value="1.5"/>
<property name="java.source" value="1.5"/>
+ <dirname property="project.root"
file="${ant.file.common}"/>
+
<property name="build.classes" location="build/classes"/>
<property name="build.test.classes"
location="build/test/classes"/>
<property name="build.tools.classes"
location="build/tools/classes"/>
@@ -28,14 +30,13 @@
<property name="release.tar"
location="${release.dir}/${project.namever}.tar"/>
<property name="release.tgz"
location="${release.dir}/${project.namever}.tgz"/>
<property name="release.bz2"
location="${release.dir}/${project.namever}.bz2"/>
+ <property name="qpid.work.dir"
location="${project.root}/build/test-work"/>
<property name="java.naming.factory.initial"
value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
<available property="src.test.dir.exists"
file="${src.test.dir}"/>
- <dirname property="project.root"
file="${ant.file.common}"/>
-
<property file="${project.root}/default.testprofile"/>
<path id="class.path">
@@ -102,7 +103,7 @@
<jar destfile="${bdbtools.jar}"
basedir="${build.tools.classes}"/>
</target>
- <target name="test" depends="build-tests"
if="src.test.dir.exists"
+ <target name="test" depends="build-tests,prepare-tests"
if="src.test.dir.exists"
unless="${dontruntest}" description="execute unit
tests">
<delete file="${module.failed}"/>
@@ -110,7 +111,6 @@
<junit fork="${test.fork}" maxmemory="${test.mem}"
reloading="no"
haltonfailure="${haltonfailure}"
haltonerror="${haltonerror}"
failureproperty="test.failures" printsummary="on"
timeout="600000" >
-
<sysproperty key="amqj.logging.level"
value="${amqj.logging.level}"/>
<sysproperty key="root.logging.level"
value="${root.logging.level}"/>
<sysproperty key="log4j.configuration"
value="${log4j.configuration}"/>
@@ -125,8 +125,8 @@
<sysproperty key="max_prefetch" value
="${max_prefetch}"/>
<sysproperty key="example.plugin.target"
value="${project.root}/build/lib/plugins"/>
<sysproperty key="QPID_HOME" value="${project.root}"/>
- <sysproperty key="QPID_WORK"
value="${project.root}/build/test-work"/>
- <sysproperty key="BDB_WORK"
value="${project.root}/build/test-work/bdbstore"/>
+ <sysproperty key="QPID_WORK" value="${qpid.work.dir}"/>
+ <sysproperty key="BDB_WORK"
value="${qpid.work.dir}/bdbstore"/>
<sysproperty key="BDB_HOME" value="${project.root}"/>
<sysproperty key="test.excludes" value="false"/>
@@ -160,6 +160,10 @@
<mkdir dir="${release.dir}"/>
</target>
+ <target name="prepare-tests">
+ <mkdir dir="${qpid.work.dir}"/>
+ </target>
+
<target name="zip" depends="build,prepare"
description="build release archive">
<zip destfile="${release.zip}">
<zipfileset dir="${bin.dir}" prefix="${project.namever}/bin"
filemode="755">
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-04-02
15:58:37 UTC (rev 3250)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-04-02
16:52:13 UTC (rev 3251)
@@ -34,46 +34,39 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.MockContentChunk;
-import org.apache.qpid.server.queue.MockPersistentAMQMessage;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.File;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
public class BDBStoreTest extends BDBVMTestCase
{
private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
- private BDBMessageStore _transactionLog;
+ private BDBMessageStore _bdbMessageStore;
+ private TransactionLog _transactionLog;
+ private RoutingTable _routingTable;
private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
private StoreContext _storeContext = new StoreContext();
@@ -104,12 +97,18 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _transactionLog = new InspectableBDBMessageStore();
- _transactionLog.configure(BDB_DIR);
+ _bdbMessageStore= new BDBMessageStore();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new
PropertiesConfiguration()), _transactionLog);
- _transactionLog.setVirtualHost(_virtualHost);
+ _routingTable = _bdbMessageStore;
+ VirtualHostConfiguration vhostConfig = new
VirtualHostConfiguration("test", new PropertiesConfiguration());
+
+ _transactionLog = new
TestableTransactionLog(_bdbMessageStore.configure(BDB_DIR));
+
+ _virtualHost = new VirtualHost(vhostConfig, _transactionLog);
+
+ _bdbMessageStore.setVirtualHost(_virtualHost);
+
_txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null,
new LinkedList<RequiredDeliveryException>());
}
@@ -120,10 +119,12 @@
PropertiesConfiguration env = new PropertiesConfiguration();
env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
+ env.addProperty("store.class",
"org.apache.qpid.server.transactionlog.TestableTransactionLog");
+ env.addProperty("store.delegate",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
_virtualHost = new VirtualHost(new VirtualHostConfiguration("test",
env));
- _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
+ _transactionLog = _virtualHost.getTransactionLog();
}
public void tearDown() throws Exception
@@ -170,13 +171,13 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, queueArguments);
- _transactionLog.createQueue(queue, queueArguments);
+ _routingTable.createQueue(queue, queueArguments);
AMQShortString routingKey = new AMQShortString("Test-Key");
FieldTable bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test =
'MST'");
-
_transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(),
routingKey, queue, bindArguments);
+ _routingTable.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(),
routingKey, queue, bindArguments);
reload();
@@ -252,7 +253,7 @@
_transactionLog.storeMessageMetaData(_storeContext, 14L, new
MessageMetaData(pubBody, chb, 1));
_transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
- MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
+ MessageMetaData mmd = _bdbMessageStore.getMessageMetaData(_storeContext, 14L);
MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
Assert.assertEquals("Message exchange has changed",
pubBody.getExchange(), returnedPubBody.getExchange());
Assert.assertEquals("Immediate flag has changed",
pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -267,7 +268,7 @@
Assert.assertEquals("Property ContentType has changed",
props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
Assert.assertEquals("Property MessageID has changed",
props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
Assert.assertEquals("MessageMD ChunkCount has changed",
mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody =
_transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
+ ContentChunk returnedContentBody =
_bdbMessageStore.getContentBodyChunk(_storeContext, 14L, 0);
ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
returnedPayloadAsBytes.get(returnedPayload);
@@ -285,13 +286,13 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
_transactionLog.storeMessageMetaData(_storeContext, 15L, new
MessageMetaData(pubBody, chb, 1));
_transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
_transactionLog.removeMessage(_storeContext, 15L);
// the next line should throw since the message id should not be found
try
{
- _transactionLog.getMessageMetaData(_storeContext, 15L);
+ _bdbMessageStore.getMessageMetaData(_storeContext, 15L);
Assert.fail("No exception thrown when message id not found getting
metadata");
}
catch (AMQException e)
@@ -301,7 +302,7 @@
try
{
- _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
Assert.fail("No exception thrown when message id not found getting
content chunk");
}
catch (AMQException e)
@@ -324,14 +325,16 @@
_transactionLog.storeMessageMetaData(_storeContext, 22L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 20L);
- _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+ _transactionLog.enqueueMessage(_storeContext, queues, 20L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 21L);
_transactionLog.commitTran(_storeContext);
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Enqueued messages have changed", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 20L,
val.longValue());
@@ -342,7 +345,7 @@
public void testTranRollback1() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -357,21 +360,23 @@
_transactionLog.storeMessageMetaData(_storeContext, 32L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 30L);
- _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+ _transactionLog.enqueueMessage(_storeContext, queues, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 31L);
_transactionLog.commitTran(_storeContext);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 32L);
_transactionLog.abortTran(_storeContext);
_transactionLog.beginTran(_storeContext);
_transactionLog.commitTran(_storeContext);
- enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Message is still present",
!enqueuedIds.contains(20L));
assertEquals("Incorrect Enqueued Message Count:", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -383,7 +388,7 @@
public void testTranRollback2() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -398,18 +403,21 @@
_transactionLog.storeMessageMetaData(_storeContext, 32L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 30L);
_transactionLog.abortTran(_storeContext);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 31L);
- _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 31L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 32L);
_transactionLog.commitTran(_storeContext);
- enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Incorrect Enqueued Message Count", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 31L,
val.longValue());
@@ -419,7 +427,7 @@
public void testRecovery() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -438,16 +446,22 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
- _transactionLog.createQueue(queue2);
+ _routingTable.createQueue(queue);
+ _routingTable.createQueue(queue2);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 40L);
- _transactionLog.enqueueMessage(_storeContext, queue, 41L);
- _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 40L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 41L);
+ ArrayList<AMQQueue> queues2 = new ArrayList<AMQQueue>();
+ queues2.add(queue2);
+
+ _transactionLog.enqueueMessage(_storeContext, queues2, 42L);
_transactionLog.commitTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 42L);
reload();
@@ -481,20 +495,23 @@
_transactionLog.storeMessageMetaData(_storeContext, 50L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
- _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
+ _transactionLog.enqueueMessage(_storeContext, queues, 50L);
_transactionLog.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _transactionLog.createQueue(queue);
- _transactionLog.removeQueue(queue);
+ _routingTable.createQueue(queue);
+ _routingTable.removeQueue(queue);
try
{
- _transactionLog.removeQueue(queue);
+ _routingTable.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant
queue");
}
catch (AMQException e)
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-04-02
15:58:37 UTC (rev 3250)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-04-02
16:52:13 UTC (rev 3251)
@@ -178,6 +178,7 @@
startBroker(1, VERSION_2);
+ ///* This test is currently broken due to QPID-1275
//Ensure that the selector was preseved on restart and caused all msgs to be
removed.
sendAndCheckDurableSubscriber(broker, false, true, 0, null);
stopBroker(1);
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-04-02
15:58:37 UTC (rev 3250)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-04-02
16:52:13 UTC (rev 3251)
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.TestTransactionLog;
-
-import java.util.List;
-
-public class InspectableBDBMessageStore extends BDBMessageStore implements
TestTransactionLog
-{
- public List<AMQQueue> getMessageReferenceMap(Long messageId)
- {
- return _messageOnQueueMap.get(messageId);
- }
-}
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-04-02
15:58:37 UTC (rev 3250)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-04-02
16:52:13 UTC (rev 3251)
@@ -29,6 +29,8 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,6 +41,7 @@
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -56,7 +59,8 @@
private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
- protected InspectableBDBMessageStore _transactionLog;
+ protected TestTransactionLog _transactionLog;
+ protected BDBMessageStore _messageStore;
private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
protected VirtualHost _virtualHost;
@@ -74,6 +78,7 @@
protected boolean _transactional;
protected boolean _ack;
+
public void setUp() throws Exception
{
if (BDB_DIR.exists())
@@ -88,11 +93,12 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _transactionLog = new InspectableBDBMessageStore();
- _transactionLog.configure(BDB_DIR);
+ _messageStore = new BDBMessageStore();
+ _transactionLog = new TestableTransactionLog(_messageStore.configure(BDB_DIR));
+
_virtualHost = new VirtualHost(new
VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()),
_transactionLog);
- _transactionLog.setVirtualHost(_virtualHost);
+ _messageStore.setVirtualHost(_virtualHost);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -105,6 +111,13 @@
_ack = true;
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
@@ -153,11 +166,11 @@
checkMessageMetaDataExists(messageId);
// Check that it is enqueued
- List<AMQQueue> queueList =
_transactionLog.getMessageReferenceMap(messageId);
- assertNotNull(queueList);
- assertEquals("Message should be enqueued on both queues.", 1,
queueList.size());
- assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ List<Long> queueList =
_messageStore.getEnqueuedMessages(_queue1.getName());
+ assertTrue("Message not enqueued as
expected.",queueList.contains(messageId));
+ assertEquals("Queue should only have one message.", 1,
queueList.size());
+
// Create consumer to correctly consume message
AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
@@ -176,7 +189,7 @@
_queue1.registerSubscription(sub1, true);
// Give the delivery thread time to deliver the message
- Thread.sleep(200);
+ Thread.sleep(300);
Thread.yield();
if (_ack)
@@ -189,11 +202,12 @@
channel1.commit();
}
}
- // Check that it is now dequeued
- queueList = _transactionLog.getMessageReferenceMap(messageId);
- assertNull("Queue List was not empty:" + (queueList != null ?
queueList.size() : ""), queueList);
checkMessageMetaDataRemoved(messageId);
+
+ // Check that it is now dequeued
+ queueList = _messageStore.getEnqueuedMessages(_queue1.getName());
+ assertTrue("Queue List was not empty:" + queueList.size() ,
queueList.isEmpty());
}
/**
@@ -292,18 +306,21 @@
channel1.commit();
}
}
+
+
+ checkMessageMetaDataRemoved(messageId);
+
// Check that it is now dequeued
queueList = _transactionLog.getMessageReferenceMap(messageId);
assertNull("Queue List was not empty:" + (queueList != null ?
queueList.size() : ""), queueList);
- checkMessageMetaDataRemoved(messageId);
}
protected void checkMessageMetaDataExists(long messageId)
{
try
{
- _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
}
catch (AMQException amqe)
{
@@ -315,7 +332,7 @@
{
try
{
- _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
fail("Message MetaData still exists for message:" + messageId);
}
catch (AMQException amqe)