[rhmessaging-commits] rhmessaging commits: r3251 - in store/trunk/java/bdbstore: src/test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Apr 2 12:52:14 EDT 2009


Author: ritchiem
Date: 2009-04-02 12:52:13 -0400 (Thu, 02 Apr 2009)
New Revision: 3251

Removed:
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
Modified:
   store/trunk/java/bdbstore/build.xml
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
Log:
Update based on QPID-1764 to correct tests and ensure testing can be carried out correctly

Modified: store/trunk/java/bdbstore/build.xml
===================================================================
--- store/trunk/java/bdbstore/build.xml	2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/build.xml	2009-04-02 16:52:13 UTC (rev 3251)
@@ -17,6 +17,8 @@
     <property name="java.target" value="1.5"/>
     <property name="java.source" value="1.5"/>
 
+    <dirname property="project.root" file="${ant.file.common}"/>
+
     <property name="build.classes" location="build/classes"/>
     <property name="build.test.classes" location="build/test/classes"/>
     <property name="build.tools.classes" location="build/tools/classes"/>
@@ -28,14 +30,13 @@
     <property name="release.tar"       location="${release.dir}/${project.namever}.tar"/>
     <property name="release.tgz"       location="${release.dir}/${project.namever}.tgz"/>
     <property name="release.bz2"       location="${release.dir}/${project.namever}.bz2"/>
+    <property name="qpid.work.dir"     location="${project.root}/build/test-work"/>
 
 
     <property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>    
 
     <available property="src.test.dir.exists" file="${src.test.dir}"/>
 
-    <dirname property="project.root" file="${ant.file.common}"/>
-
     <property file="${project.root}/default.testprofile"/>
 
     <path id="class.path">
@@ -102,7 +103,7 @@
 	<jar destfile="${bdbtools.jar}" basedir="${build.tools.classes}"/>
     </target>
 
-  <target name="test" depends="build-tests" if="src.test.dir.exists"
+  <target name="test" depends="build-tests,prepare-tests" if="src.test.dir.exists"
          unless="${dontruntest}" description="execute unit tests">
 
     <delete file="${module.failed}"/>
@@ -110,7 +111,6 @@
     <junit fork="${test.fork}" maxmemory="${test.mem}" reloading="no"
            haltonfailure="${haltonfailure}" haltonerror="${haltonerror}"
            failureproperty="test.failures" printsummary="on" timeout="600000" >
-
       <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
       <sysproperty key="root.logging.level" value="${root.logging.level}"/>
       <sysproperty key="log4j.configuration" value="${log4j.configuration}"/>
@@ -125,8 +125,8 @@
       <sysproperty key="max_prefetch" value ="${max_prefetch}"/>
       <sysproperty key="example.plugin.target" value="${project.root}/build/lib/plugins"/>
       <sysproperty key="QPID_HOME" value="${project.root}"/>
-      <sysproperty key="QPID_WORK" value="${project.root}/build/test-work"/>
-      <sysproperty key="BDB_WORK" value="${project.root}/build/test-work/bdbstore"/>
+      <sysproperty key="QPID_WORK" value="${qpid.work.dir}"/>
+      <sysproperty key="BDB_WORK" value="${qpid.work.dir}/bdbstore"/>
       <sysproperty key="BDB_HOME" value="${project.root}"/>
       <sysproperty key="test.excludes" value="false"/>
 
@@ -160,6 +160,10 @@
      <mkdir dir="${release.dir}"/>
   </target>
 
+  <target name="prepare-tests">
+     <mkdir dir="${qpid.work.dir}"/>
+  </target>
+
   <target name="zip" depends="build,prepare" description="build release archive">
     <zip destfile="${release.zip}">
       <zipfileset dir="${bin.dir}" prefix="${project.namever}/bin" filemode="755">

Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2009-04-02 16:52:13 UTC (rev 3251)
@@ -34,46 +34,39 @@
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
+import org.apache.qpid.server.transactionlog.TransactionLog;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.MockContentChunk;
-import org.apache.qpid.server.queue.MockPersistentAMQMessage;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
 
 public class BDBStoreTest extends BDBVMTestCase
 {
     private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
 
-    private BDBMessageStore _transactionLog;
+    private BDBMessageStore _bdbMessageStore;
+    private TransactionLog _transactionLog;
+    private RoutingTable _routingTable;
     private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
 
     private StoreContext _storeContext = new StoreContext();
@@ -104,12 +97,18 @@
         deleteDirectory(bdbDir);
         BDB_DIR.mkdirs();
 
-        _transactionLog = new InspectableBDBMessageStore();
-        _transactionLog.configure(BDB_DIR);
+        _bdbMessageStore= new BDBMessageStore();
 
-        _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new PropertiesConfiguration()), _transactionLog);
-        _transactionLog.setVirtualHost(_virtualHost);
+        _routingTable = _bdbMessageStore;
 
+        VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", new PropertiesConfiguration());
+
+        _transactionLog = new TestableTransactionLog(_bdbMessageStore.configure(BDB_DIR));
+
+        _virtualHost = new VirtualHost(vhostConfig, _transactionLog);
+
+        _bdbMessageStore.setVirtualHost(_virtualHost);
+
         _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
@@ -120,10 +119,12 @@
         PropertiesConfiguration env = new PropertiesConfiguration();
 
         env.addProperty("store.environment-path", STORE_LOCATION);
-        env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
 
+        env.addProperty("store.class", "org.apache.qpid.server.transactionlog.TestableTransactionLog");
+        env.addProperty("store.delegate", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
         _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
-        _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
+        _transactionLog = _virtualHost.getTransactionLog();
     }
 
     public void tearDown() throws Exception
@@ -170,13 +171,13 @@
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
 
-        _transactionLog.createQueue(queue, queueArguments);
+        _routingTable.createQueue(queue, queueArguments);
 
         AMQShortString routingKey = new AMQShortString("Test-Key");
         FieldTable bindArguments = new FieldTable();
         bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
 
-        _transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+        _routingTable.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
 
         reload();
 
@@ -252,7 +253,7 @@
         _transactionLog.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
         _transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
 
-        MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
+        MessageMetaData mmd = _bdbMessageStore.getMessageMetaData(_storeContext, 14L);
         MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
         Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
         Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -267,7 +268,7 @@
         Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
         Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
         Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
-        ContentChunk returnedContentBody = _transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
+        ContentChunk returnedContentBody = _bdbMessageStore.getContentBodyChunk(_storeContext, 14L, 0);
         ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
         byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
         returnedPayloadAsBytes.get(returnedPayload);
@@ -285,13 +286,13 @@
         ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
         _transactionLog.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
         _transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
-        _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+        _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
         _transactionLog.removeMessage(_storeContext, 15L);
 
         // the next line should throw since the message id should not be found
         try
         {
-            _transactionLog.getMessageMetaData(_storeContext, 15L);
+            _bdbMessageStore.getMessageMetaData(_storeContext, 15L);
             Assert.fail("No exception thrown when message id not found getting metadata");
         }
         catch (AMQException e)
@@ -301,7 +302,7 @@
 
         try
         {
-            _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+            _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
             Assert.fail("No exception thrown when message id not found getting content chunk");
         }
         catch (AMQException e)
@@ -324,14 +325,16 @@
         _transactionLog.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _transactionLog.createQueue(queue);
+        _routingTable.createQueue(queue);
 
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 20L);
-        _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.add(queue);
+        _transactionLog.enqueueMessage(_storeContext, queues, 20L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 21L);
         _transactionLog.commitTran(_storeContext);
 
-        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
@@ -342,7 +345,7 @@
 
     public void testTranRollback1() throws Exception
     {
-        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -357,21 +360,23 @@
         _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _transactionLog.createQueue(queue);
+        _routingTable.createQueue(queue);
 
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 30L);
-        _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.add(queue);
+        _transactionLog.enqueueMessage(_storeContext, queues, 30L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 31L);
         _transactionLog.commitTran(_storeContext);
 
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 32L);
         _transactionLog.abortTran(_storeContext);
 
         _transactionLog.beginTran(_storeContext);
         _transactionLog.commitTran(_storeContext);
 
-        enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
         assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
@@ -383,7 +388,7 @@
 
     public void testTranRollback2() throws Exception
     {
-        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -398,18 +403,21 @@
         _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _transactionLog.createQueue(queue);
+        _routingTable.createQueue(queue);
 
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.add(queue);
+
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 30L);
         _transactionLog.abortTran(_storeContext);
 
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 31L);
-        _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 31L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 32L);
         _transactionLog.commitTran(_storeContext);
 
-        enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
         Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
@@ -419,7 +427,7 @@
 
     public void testRecovery() throws Exception
     {
-        List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+        List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
         assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
 
         MessagePublishInfo pubBody = createPublishBody();
@@ -438,16 +446,22 @@
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 
-        _transactionLog.createQueue(queue);
-        _transactionLog.createQueue(queue2);
+        _routingTable.createQueue(queue);
+        _routingTable.createQueue(queue2);
 
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.add(queue);
+
         _transactionLog.beginTran(_storeContext);
-        _transactionLog.enqueueMessage(_storeContext, queue, 40L);
-        _transactionLog.enqueueMessage(_storeContext, queue, 41L);
-        _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 40L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 41L);
+        ArrayList<AMQQueue> queues2 = new ArrayList<AMQQueue>();
+        queues2.add(queue2);
+
+        _transactionLog.enqueueMessage(_storeContext, queues2, 42L);
         _transactionLog.commitTran(_storeContext);
 
-        _transactionLog.enqueueMessage(_storeContext, queue, 42L);
+        _transactionLog.enqueueMessage(_storeContext, queues, 42L);
 
         reload();
 
@@ -481,20 +495,23 @@
         _transactionLog.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _transactionLog.createQueue(queue);
+        _routingTable.createQueue(queue);
 
-        _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.add(queue);
+
+        _transactionLog.enqueueMessage(_storeContext, queues, 50L);
         _transactionLog.dequeueMessage(_storeContext, queue, 50L);
     }
 
     public void testQueueRemove() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _transactionLog.createQueue(queue);
-        _transactionLog.removeQueue(queue);
+        _routingTable.createQueue(queue);
+        _routingTable.removeQueue(queue);
         try
         {
-            _transactionLog.removeQueue(queue);
+            _routingTable.removeQueue(queue);
             Assert.fail("No exception thrown when deleting non-existant queue");
         }
         catch (AMQException e)

Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2009-04-02 16:52:13 UTC (rev 3251)
@@ -178,6 +178,7 @@
 
         startBroker(1, VERSION_2);
 
+        ///* This test is currently broken due to QPID-1275        
         //Ensure that the selector was preseved on restart and caused all msgs to be removed.
         sendAndCheckDurableSubscriber(broker, false, true, 0, null);
         stopBroker(1);

Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java	2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java	2009-04-02 16:52:13 UTC (rev 3251)
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.TestTransactionLog;
-
-import java.util.List;
-
-public class InspectableBDBMessageStore extends BDBMessageStore implements TestTransactionLog
-{
-    public List<AMQQueue> getMessageReferenceMap(Long messageId)
-    {
-        return _messageOnQueueMap.get(messageId);
-    }
-}

Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java	2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java	2009-04-02 16:52:13 UTC (rev 3251)
@@ -29,6 +29,8 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,6 +41,7 @@
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -56,7 +59,8 @@
 
     private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
 
-    protected InspectableBDBMessageStore _transactionLog;
+    protected TestTransactionLog _transactionLog;
+    protected BDBMessageStore _messageStore;
     private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
     protected VirtualHost _virtualHost;
 
@@ -74,6 +78,7 @@
     protected boolean _transactional;
     protected boolean _ack;
 
+
     public void setUp() throws Exception
     {
         if (BDB_DIR.exists())
@@ -88,11 +93,12 @@
         deleteDirectory(bdbDir);
         BDB_DIR.mkdirs();
 
-        _transactionLog = new InspectableBDBMessageStore();
-        _transactionLog.configure(BDB_DIR);
+        _messageStore = new BDBMessageStore();
 
+        _transactionLog = new TestableTransactionLog(_messageStore.configure(BDB_DIR));
+
         _virtualHost = new VirtualHost(new VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()), _transactionLog);
-        _transactionLog.setVirtualHost(_virtualHost);
+        _messageStore.setVirtualHost(_virtualHost);
 
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
 
@@ -105,6 +111,13 @@
         _ack = true;
     }
 
+    @Override
+    public void tearDown() throws Exception
+    {
+        ApplicationRegistry.removeAll();
+        super.tearDown();
+    }
+    
     protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
     {
         IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
@@ -153,11 +166,11 @@
         checkMessageMetaDataExists(messageId);
 
         // Check that it is enqueued
-        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
-        assertNotNull(queueList);
-        assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
-        assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+        List<Long> queueList = _messageStore.getEnqueuedMessages(_queue1.getName());
 
+        assertTrue("Message not enqueued as expected.",queueList.contains(messageId));
+        assertEquals("Queue should only have one message.", 1, queueList.size());
+
         // Create consumer to correctly consume message
         AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
         AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
@@ -176,7 +189,7 @@
         _queue1.registerSubscription(sub1, true);
 
         // Give the delivery thread time to deliver the message
-        Thread.sleep(200);
+        Thread.sleep(300);
         Thread.yield();
 
         if (_ack)
@@ -189,11 +202,12 @@
                 channel1.commit();
             }
         }
-        // Check that it is now dequeued
-        queueList = _transactionLog.getMessageReferenceMap(messageId);
-        assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
 
         checkMessageMetaDataRemoved(messageId);
+
+        // Check that it is now dequeued
+        queueList = _messageStore.getEnqueuedMessages(_queue1.getName());
+        assertTrue("Queue List was not empty:" + queueList.size() , queueList.isEmpty());
     }
 
     /**
@@ -292,18 +306,21 @@
                 channel1.commit();
             }
         }
+
+        
+        checkMessageMetaDataRemoved(messageId);
+
         // Check that it is now dequeued
         queueList = _transactionLog.getMessageReferenceMap(messageId);
         assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
 
-        checkMessageMetaDataRemoved(messageId);
     }
 
     protected void checkMessageMetaDataExists(long messageId)
     {
         try
         {
-            _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+            _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
         }
         catch (AMQException amqe)
         {
@@ -315,7 +332,7 @@
     {
         try
         {
-            _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+            _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
             fail("Message MetaData still exists for message:" + messageId);
         }
         catch (AMQException amqe)




More information about the rhmessaging-commits mailing list