Author: ritchiem
Date: 2009-02-20 08:34:27 -0500 (Fri, 20 Feb 2009)
New Revision: 3125
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
QPID-1632 : Update to perform internal reference counting. Additional testing via
MessagePersistent*Tests
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ -
http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+
+ This is an example config using the BDBMessageStore available from
+ the Red Hat Messaging project at
etp.108.redhat.com and distributed under GPL.
+ -->
+
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ <security-enabled>false</security-enabled>
+ <ssl>
+ <enabled>true</enabled>
+ <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore
-->
+
<keyStorePath>${prefix}/../test_resources/ssl/keystore.jks</keyStorePath>
+ <keyStorePassword>password</keyStorePassword>
+ </ssl>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
+ </advanced>
+
+ <security>
+ <principal-databases>
+ <principal-database>
+ <name>passwordfile</name>
+
<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwd</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+
+ <access>
+
<class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
+ </access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
+ </security>
+
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+
<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+
<environment-path>${work}/bdbstore/localhost-store</environment-path>
+ </store>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+
<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+
<environment-path>${work}/bdbstore/development-store</environment-path>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+
<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+
<environment-path>${work}/bdbstore/test-store</environment-path>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ -
http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<virtualhosts>
+ <default>test</default>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <exchanges>
+ <exchange>
+ <type>direct</type>
+ <name>test.direct</name>
+ <durable>true</durable>
+ </exchange>
+ <exchange>
+ <type>topic</type>
+ <name>test.topic</name>
+ </exchange>
+ </exchanges>
+ <queues>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb
-->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb
-->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10
mins -->
+
+ <queue>
+ <name>queue</name>
+ </queue>
+ <queue>
+ <name>ping</name>
+ </queue>
+ <queue>
+ <name>test-queue</name>
+ <test-queue>
+ <exchange>test.direct</exchange>
+ <durable>true</durable>
+ </test-queue>
+ </queue>
+ <queue>
+ <name>test-ping</name>
+ <test-ping>
+ <exchange>test.direct</exchange>
+ </test-ping>
+ </queue>
+
+ </queues>
+ </localhost>
+ </virtualhost>
+
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <queues>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <queues>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth>
<!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize>
<!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
+ </test>
+ </virtualhost>
+</virtualhosts>
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
===================================================================
(Binary files differ)
Property changes on:
store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -49,15 +49,16 @@
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.PersistentAMQMessage;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import java.io.File;
import java.util.ArrayList;
@@ -130,8 +131,11 @@
private Map<AMQShortString, Long> _queueNameToIdMap = new
ConcurrentHashMap<AMQShortString, Long>();
+ protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new
ConcurrentHashMap<Long, List<AMQQueue>>();
+
+ private final Map<Transaction, Map<Long, List<AMQQueue>>>
_dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
+
// Factory Classes to create the TupleBinding objects that relfect the version
instance of this BDBStore
-
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
@@ -188,16 +192,15 @@
* whatever parameters it wants.
*
* @param virtualHost The virtual host using by this store
- * @param base The base element identifier from which all configuration items
are relative. For example, if
- * the base element is "store", the all elements used by
concrete classes will be "store.foo"
- * etc.
- * @param config The apache commons configuration object.
+ * @param base Not used
+ * @param vHostConfig The configuration for this virtualhost
*
* @throws Exception If any error occurs that means the store is unable to configure
itself.
*/
- public void configure(VirtualHost virtualHost, String base, Configuration config)
throws Exception
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration
vHostConfig) throws Exception
{
- File environmentPath = new File(config.getString(base + "." +
ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
+ Configuration config = vHostConfig.getStoreConfiguration();
+ File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY,
"bdbEnv"));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -207,7 +210,7 @@
}
}
- _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
+ _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
configure(virtualHost, environmentPath, false);
}
@@ -477,13 +480,14 @@
/**
* Removes the specified message from the store in the given transactional store
context.
- *
+ * Internal method that is package scoped to allow testing.
+ *
* @param context The transactional context to remove the message in.
* @param messageId Identifies the message to remove.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ void removeMessage(StoreContext context, Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " +
context + ", Long messageId = " + messageId
// + "): called");
@@ -559,7 +563,7 @@
if (localTx)
{
- commit(tx);
+ tx.commit();
context.setPayload(null);
}
}
@@ -943,6 +947,8 @@
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
+ recordEnqueue(messageId, queue);
+
try
{
_deliveryDb.put(tx, key, value);
@@ -968,6 +974,7 @@
try
{
tx = _environment.beginTransaction(null, null);
+ _log.info("Creating local transaction:" + tx);
context.setPayload(tx);
return true;
@@ -1026,11 +1033,32 @@
}
+ //Record the delete for processing AFTER the commit has taken place.
+ synchronized (_dequeueTxMap)
+ {
+ Map<Long, List<AMQQueue>> transactionMap =
_dequeueTxMap.get(tx);
+ if (transactionMap == null)
+ {
+ transactionMap = new HashMap<Long, List<AMQQueue>>();
+ _dequeueTxMap.put(tx, transactionMap);
+ }
+
+ List<AMQQueue> queueList = transactionMap.get(messageId);
+
+ if (queueList == null)
+ {
+ queueList = new LinkedList<AMQQueue>();
+ transactionMap.put(messageId, queueList);
+ }
+
+ queueList.add(queue);
+ }
+
if (isLocal)
{
commit(tx);
context.setPayload(null);
- }
+ }
}
catch (DatabaseException e)
{
@@ -1048,17 +1076,19 @@
{
throw new AMQException("Error rolling back transaction: " +
e1, e1);
}
+ finally
+ {
+ synchronized (_dequeueTxMap)
+ {
+ _dequeueTxMap.remove(tx);
+ }
+ }
}
throw new AMQException("Error accessing database while dequeuing
message: " + e, e);
}
}
- private boolean isLocalTransaction(StoreContext context)
- {
- return context.getPayload() == null;
- }
-
/**
* Begins a transactional context.
*
@@ -1260,8 +1290,9 @@
catch (Throwable ioobe)
{
abortTran(context);
+
throw new AMQException("Invalid database format. Please use upgrade tool
for store in Virtualhost:'"
- + _virtualHost.getName() + "'", ioobe);
+ + _virtualHost.getName() + "'",
ioobe.getCause() != null ? ioobe.getCause() : ioobe);
}
stateTransition(State.RECOVERING, State.STARTED);
@@ -1599,31 +1630,11 @@
}
}
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
- {
- _queue.enqueue(_context, _message);
- }
-
- }
-
private void deliverMessages(final StoreContext context, Map<AMQShortString,
AMQQueue> queues)
throws DatabaseException, AMQException
{
Map<Long, PersistentAMQMessage> msgMap = new HashMap<Long,
PersistentAMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new
TreeMap<AMQShortString, Integer>();
@@ -1660,13 +1671,8 @@
maxId = Math.max(maxId, messageId);
PersistentAMQMessage message = msgMap.get(messageId);
- if (message != null)
+ if (message == null)
{
- message.incrementReference();
- }
- else
- {
-
message = (PersistentAMQMessage)
_messageFactory.createMessage(messageId, this);
msgMap.put(messageId, message);
@@ -1682,7 +1688,6 @@
ContentChunk cc = getContentBodyChunk(context, messageId,
index);
message.recoverContentBodyFrame(cc, index == count);
}
-
}
if (_log.isDebugEnabled())
@@ -1702,15 +1707,11 @@
}
- actions.add(new ProcessAction(queue, context, message));
+ recordEnqueue(messageId, queue);
+ queue.enqueue(context, message);
}
- for (ProcessAction action : actions)
- {
- action.process();
- }
-
_messageId.set(maxId + 1);
}
catch (DatabaseException e)
@@ -1753,9 +1754,21 @@
{
// _log.debug("void commit(Transaction tx = " + tx + "):
called");
+ if (tx == null)
+ {
+ throw new DatabaseException("Fatal internal error: transactional context
is empty at commitTran");
+ }
+
tx.commitNoSync();
- Commit commit = new Commit(_commitThread, tx);
+ Map<Long, List<AMQQueue>> dequeueMap = null;
+ synchronized (_dequeueTxMap)
+ {
+ dequeueMap = _dequeueTxMap.remove(tx);
+ }
+
+ Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+
commit.commit();
}
@@ -1773,14 +1786,19 @@
private final Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
+ private Map<Long, List<AMQQueue>> _messageDequeueMap;
+ private TransactionLog _transactionLog;
- public Commit(CommitThread commitThread, Transaction tx)
+ public Commit(CommitThread commitThread, Transaction tx, Map<Long,
+ List<AMQQueue>> messageDequeueMap, TransactionLog
transactionLog)
{
// _log.debug("public Commit(CommitThread commitThread = " +
commitThread + ", Transaction tx = " + tx
// + "): called");
_commitThread = commitThread;
_tx = tx;
+ _messageDequeueMap = messageDequeueMap;
+ _transactionLog = transactionLog;
}
public void prepare(boolean synch) throws DatabaseException
@@ -1813,7 +1831,24 @@
_complete = true;
+ // If we have dequeuedMessages so update our internal state
+ if (_messageDequeueMap != null)
+ {
+ _log.info("Transaction(" + _tx + ") Complete : Dequeuing
messages used.");
+ StoreContext dequeueMessageContext = new StoreContext();
+
+ for (Map.Entry<Long, List<AMQQueue>> entry :
_messageDequeueMap.entrySet())
+ {
+ Long id = entry.getKey();
+ for (AMQQueue queue : entry.getValue())
+ {
+ ((BDBMessageStore)
_transactionLog).recordDequeue(dequeueMessageContext, id, queue);
+ }
+ }
+ }
+
notify();
+
}
public synchronized void abort(DatabaseException databaseException)
@@ -1858,6 +1893,72 @@
}
/**
+ * Record that the give message is enqueued on the specified queue.
+ *
+ * @param messageId The message id to enqueue
+ * @param queue The queue it is enqueued upon.
+ */
+ private void recordEnqueue(Long messageId, AMQQueue queue)
+ {
+ List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+ if (queues == null)
+ {
+ queues = new LinkedList<AMQQueue>();
+ }
+
+ queues.add(queue);
+
+ _messageOnQueueMap.put(messageId, queues);
+ }
+
+ /**
+ * Update our records that the given message is nolonger on the specified queue.
+ * If the message no longer has any queue references then we can discard the
content.
+ *
+ * @param context
+ * @param messageId
+ * @param queue
+ */
+ private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
+ {
+ _log.info("Dequeue Message(" + messageId + ") from queue(" +
queue.getName() + ") context=" + context);
+ List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Error, Tried to dequeue a message that is
not enqueued");
+ }
+
+ if (queues.remove(queue))
+ {
+ // if we now have no more references to this message we can dispose of it
+ if (queues.size() == 0)
+ {
+ try
+ {
+ _messageOnQueueMap.remove(messageId);
+ _log.info("Removing Message(" + messageId + ") from
Tlog context=" + context);
+
+ removeMessage(context, messageId);
+ }
+ catch (AMQException e)
+ {
+ //todo As we are jus swallowing exception need to add clean up in
recover().
+ // This should purge any message content that doesn't have any
delivery records.
+ _log.debug("Error occured removing unreferenced message:" +
e.getMessage());
+ }
+
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Error, Tried to dequeue a message from a
queue, upon which it is not enqueued");
+ }
+
+ }
+
+ /**
* Implements a thread which batches and commits a queue of {@link Commit}
operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the
commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit
operations when they have been
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -20,33 +20,52 @@
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
import junit.framework.TestSuite;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.File;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -54,7 +73,7 @@
{
private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
- private BDBMessageStore _store;
+ private BDBMessageStore _transactionLog;
private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
private StoreContext _storeContext = new StoreContext();
@@ -85,13 +104,13 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _store = new BDBMessageStore();
- _store.configure(BDB_DIR);
+ _transactionLog = new InspectableBDBMessageStore();
+ _transactionLog.configure(BDB_DIR);
- _virtualHost = new VirtualHost("test", _store);
- _store.setVirtualHost(_virtualHost);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new
PropertiesConfiguration()), _transactionLog);
+ _transactionLog.setVirtualHost(_virtualHost);
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new
LinkedList<RequiredDeliveryException>());
+ _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null,
new LinkedList<RequiredDeliveryException>());
}
private void reload() throws Exception
@@ -101,10 +120,10 @@
PropertiesConfiguration env = new PropertiesConfiguration();
env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+ env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
- _virtualHost = new VirtualHost("test", env);
- _store = (BDBMessageStore) _virtualHost.getTransactionLog();
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration("test",
env));
+ _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
}
public void tearDown() throws Exception
@@ -151,13 +170,13 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, queueArguments);
- _store.createQueue(queue, queueArguments);
+ _transactionLog.createQueue(queue, queueArguments);
AMQShortString routingKey = new AMQShortString("Test-Key");
FieldTable bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test =
'MST'");
- _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(),
routingKey, queue, bindArguments);
+
_transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(),
routingKey, queue, bindArguments);
reload();
@@ -177,7 +196,7 @@
private MessagePublishInfo createPublishBody()
{
- return new MessagePublishInfoImpl(MYEXCHANGE,false,true,RK);
+ return new MessagePublishInfoImpl(MYEXCHANGE, false, true, RK);
}
private BasicContentHeaderProperties createContentHeaderProperties()
@@ -230,10 +249,10 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb,
1));
- _store.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
+ _transactionLog.storeMessageMetaData(_storeContext, 14L, new
MessageMetaData(pubBody, chb, 1));
+ _transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
- MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
+ MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
Assert.assertEquals("Message exchange has changed",
pubBody.getExchange(), returnedPubBody.getExchange());
Assert.assertEquals("Immediate flag has changed",
pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -248,7 +267,7 @@
Assert.assertEquals("Property ContentType has changed",
props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
Assert.assertEquals("Property MessageID has changed",
props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
Assert.assertEquals("MessageMD ChunkCount has changed",
mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L,
0);
+ ContentChunk returnedContentBody =
_transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
returnedPayloadAsBytes.get(returnedPayload);
@@ -264,15 +283,15 @@
ContentChunk body = createContentChunk(bodyText);
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb,
1));
- _store.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- _store.removeMessage(_storeContext, 15L);
+ _transactionLog.storeMessageMetaData(_storeContext, 15L, new
MessageMetaData(pubBody, chb, 1));
+ _transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
+ _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _transactionLog.removeMessage(_storeContext, 15L);
// the next line should throw since the message id should not be found
try
{
- _store.getMessageMetaData(_storeContext, 15L);
+ _transactionLog.getMessageMetaData(_storeContext, 15L);
Assert.fail("No exception thrown when message id not found getting
metadata");
}
catch (AMQException e)
@@ -282,7 +301,7 @@
try
{
- _store.getContentBodyChunk(_storeContext, 15L, 0);
+ _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
Assert.fail("No exception thrown when message id not found getting
content chunk");
}
catch (AMQException e)
@@ -300,19 +319,19 @@
ContentChunk body = createContentChunk(bodyText);
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb,
0));
+ _transactionLog.storeMessageMetaData(_storeContext, 20L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 21L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 22L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 20L);
- _store.enqueueMessage(_storeContext, queue, 21L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 20L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+ _transactionLog.commitTran(_storeContext);
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Enqueued messages have changed", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 20L,
val.longValue());
@@ -323,7 +342,7 @@
public void testTranRollback1() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -333,26 +352,26 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
+ _transactionLog.storeMessageMetaData(_storeContext, 30L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 31L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 32L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ _transactionLog.commitTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.abortTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.abortTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Message is still present",
!enqueuedIds.contains(20L));
assertEquals("Incorrect Enqueued Message Count:", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -364,7 +383,7 @@
public void testTranRollback2() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -374,23 +393,23 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
+ _transactionLog.storeMessageMetaData(_storeContext, 30L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 31L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 32L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.abortTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.abortTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.commitTran(_storeContext);
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Incorrect Enqueued Message Count", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 31L,
val.longValue());
@@ -400,7 +419,7 @@
public void testRecovery() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -410,25 +429,25 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb,
1));
+ _transactionLog.storeMessageMetaData(_storeContext, 40L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 41L, new
MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 42L, new
MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(),
true);
+ _transactionLog.storeContentBodyChunk(_storeContext, 42L, 0, new
MockContentChunk(), true);
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false,
_virtualHost, null);
- _store.createQueue(queue);
- _store.createQueue(queue2);
+ _transactionLog.createQueue(queue);
+ _transactionLog.createQueue(queue2);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 40L);
- _store.enqueueMessage(_storeContext, queue, 41L);
- _store.enqueueMessage(_storeContext, queue2, 42L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 40L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 41L);
+ _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+ _transactionLog.commitTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 42L);
reload();
@@ -459,23 +478,23 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb,
0));
+ _transactionLog.storeMessageMetaData(_storeContext, 50L, new
MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.enqueueMessage(_storeContext, queue, 50L);
- _store.dequeueMessage(_storeContext, queue, 50L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+ _transactionLog.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
- _store.removeQueue(queue);
+ _transactionLog.createQueue(queue);
+ _transactionLog.removeQueue(queue);
try
{
- _store.removeQueue(queue);
+ _transactionLog.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant
queue");
}
catch (AMQException e)
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -24,9 +24,13 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +46,9 @@
{
protected static final Logger _logger =
LoggerFactory.getLogger(BDBUpgradeTest.class);
- final String BDBHome = System.getProperty("BDB_HOME");
- final File _configFile = new File(BDBHome, "etc/config.xml");
-
+ protected final String QpidHome = System.getProperty("QPID_HOME");
+ protected final File _configFile = new File(QpidHome,
"etc/persistent_config.xml");
+
private String VIRTUALHOST = "test";
private static final String VERSION_1 = "1";
@@ -113,57 +117,69 @@
}
}
+ public void testDurabability() throws Exception
+ {
+ String broker = "vm://:1";
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, true, false, 10, null);
+
+ stopBroker(1);
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+
+ stopBroker(1);
+ }
+
+ /**
+ * This test is currently broken due to QPID-1275
+ * @throws Exception
+ */
public void testDurababilitySelectors() throws Exception
{
String broker = "vm://:1";
+ System.err.println("Start V1 Broker");
startBroker(1, VERSION_1);
new DurableSubscriber(broker, _topic, null).close();
+ System.err.println("Send 10 message to topic:"+_topic);
sendMessages(broker, _topic, 10);
+ //don't actually send any messages just check we get 5 odd messages
sendAndCheckDurableSubscriber(broker, false, true, 5, "odd=true");
Thread.sleep(2000);
stopBroker(1);
+ System.err.println("Upgrade Broker");
upgradeBroker();
- broker = "vm://:2";
+ System.err.println("Start V2 Broker");
+ startBroker(1, VERSION_2);
- startBroker(2, VERSION_2);
-
+ System.err.println("Ensure we can receive all the messagse messages but
don't ack them");
//Ensure msg were transitioned to new broker
+ //don't actually send any messages just check we get 5 odd messages
sendAndCheckDurableSubscriber(broker, false, false, 5, null);
- //Reset the Selector Pattern
+ System.err.println("Change selector so on recovery the messages are
dropped");
+ //Reset the Selector Pattern this should recreate the queue
new DurableSubscriber(broker, _topic, "odd=true").close();
- stopBroker(2);
-
- startBroker(2, VERSION_2);
-
- //Ensure that the selector was preseved on restart and caused all msgs to be
removed.
- sendAndCheckDurableSubscriber(broker, false, false, 0, null);
- stopBroker(2);
- }
-
- public void testDurabability() throws Exception
- {
- String broker = "vm://:1";
-
- startBroker(1, VERSION_1);
-
- sendAndCheckDurableSubscriber(broker, true, false, 10, null);
-
stopBroker(1);
- startBroker(1, VERSION_1);
+ System.err.println("Start V2 Broker");
- sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+ startBroker(1, VERSION_2);
+ //Ensure that the selector was preseved on restart and caused all msgs to be
removed.
+ sendAndCheckDurableSubscriber(broker, false, true, 0, null);
stopBroker(1);
}
@@ -184,16 +200,24 @@
ConfigurationFileApplicationRegistry config = new
ConfigurationFileApplicationRegistry(_configFile);
//Disable management on broker.
- config.getConfiguration().setProperty("management.enabled",
"false");
+ config.getConfiguration().setManagementEnabled(false);
+
+ //There can be only one MF so we need to reset it here or recovery will not work
+ MessageFactory.getInstance().reset();
+ //todo need to change MF so it is per AppRegistry not a singleton
- Configuration testVirtualhost =
config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+ ApplicationRegistry.initialise(config, port);
+
+ Configuration testVirtualhost = new PropertiesConfiguration();
testVirtualhost.setProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
- testVirtualhost.setProperty("store." +
BDBMessageStore.ENVIRONMENT_PATH_PROPERTY, "${work}/version" + port +
"Store");
+ testVirtualhost.setProperty("store." +
BDBMessageStore.ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK")+
"/version" + version + "Store");
testVirtualhost.setProperty("store.version", version);
- ApplicationRegistry.initialise(config, port);
+ ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
+ registerVirtualHost(new VirtualHost(new
VirtualHostConfiguration("bdbtest",testVirtualhost)));
+
TransportConnection.createVMBroker(port);
-
}
public void sendAndCheckDurableSubscriber(String broker, boolean send, boolean
commitRecieved, int count, String selector)
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.TestTransactionLog;
+
+import java.util.List;
+
+public class InspectableBDBMessageStore extends BDBMessageStore implements
TestTransactionLog
+{
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
+ {
+ return _messageOnQueueMap.get(messageId);
+ }
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTest extends BDBVMTestCase
+{
+
+ private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
+
+ protected InspectableBDBMessageStore _transactionLog;
+ private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
+ protected VirtualHost _virtualHost;
+
+ protected SimpleAMQQueue _queue1, _queue2;
+ protected AMQShortString _q1name = new AMQShortString("q1name");
+ protected AMQShortString _q2name = new AMQShortString("q2name");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ protected NonTransactionalContext _messageDeliveryContext;
+
+ protected static final long MESSAGE_SIZE = 0L;
+
+ File BDB_DIR = new File(STORE_LOCATION);
+
+ protected boolean _transactional;
+ protected boolean _ack;
+
+ public void setUp() throws Exception
+ {
+ if (BDB_DIR.exists())
+ {
+ deleteDirectory(BDB_DIR);
+ }
+
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry)
ApplicationRegistry.getInstance(1);
+
+ File bdbDir = new File(STORE_LOCATION);
+ deleteDirectory(bdbDir);
+ BDB_DIR.mkdirs();
+
+ _transactionLog = new InspectableBDBMessageStore();
+ _transactionLog.configure(BDB_DIR);
+
+ _virtualHost = new VirtualHost(new
VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()),
_transactionLog);
+ _transactionLog.setVirtualHost(_virtualHost);
+
+ applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+
+ _queue1 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false,
_owner, false, _virtualHost, null);
+ _queue2 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q2name, false,
_owner, false, _virtualHost, null);
+ // Create IncomingMessage and nondurable queue
+ _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new
StoreContext(), null, null);
+
+ _transactional = false;
+ _ack = true;
+ }
+
+ protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+ new
MockProtocolSession(_transactionLog), _transactionLog);
+
+ // equivalent to amqChannel.publishContenHeader
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ // This message has no bodies
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties)
contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+ msg.setContentHeaderBody(contentHeaderBody);
+ msg.setExpiration();
+
+ return msg;
+ }
+
+
+ /**
+ * Tests delivery of a single persistent message to a queue. Validate through
reference counting that the
+ * Message is removed from the TransactionLog when reference count has been used have
been removed.
+ *
+ * @throws Exception
+ */
+ public void testSingleConsume() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList =
_transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ if (_transactional && _ack)
+ {
+ channel1.setLocalTransactional();
+ }
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag1"), _ack,
+ null,
false,
+ new
LimitlessCreditManager());
+
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ?
queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ /**
+ * Tests delivery of a single persistent message to two queues so validate through
reference counting that the
+ * Message is removed from the TransactionLog when all references have been removed.
+ *
+ * @throws Exception
+ */
+ public void testTwoQueues() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList =
_transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ if (_transactional & _ack)
+ {
+ channel1.setLocalTransactional();
+ }
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag1"), _ack,
+ null,
false,
+ new
LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that message was consumed
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message Should only be enqueued on one queue.", 1,
queueList.size());
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag2"), _ack,
+ null,
false,
+ new
LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(2, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ?
queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ protected void checkMessageMetaDataExists(long messageId)
+ {
+ try
+ {
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
+ }
+ catch (AMQException amqe)
+ {
+ fail("Message MetaData does not exist for message:" + messageId);
+ }
+ }
+
+ protected void checkMessageMetaDataRemoved(long messageId)
+ {
+ try
+ {
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(),
messageId);
+ fail("Message MetaData still exists for message:" + messageId);
+ }
+ catch (AMQException amqe)
+ {
+ assertEquals("Metadata not found for message with id " + messageId,
amqe.getMessage());
+ }
+ }
+
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTransactionalTest extends MessagePersistenceTest
+{
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _transactional = true;
+ }
+
+ /**
+ * Tests delivery of a single persistent message to two queues so validate through
reference counting that the
+ * Message is removed from the TransactionLog when all references have been removed.
+ *
+ * @throws Exception
+ */
+ public void testAckTwoQueuesInOneTx() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList =
_transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ channel1.setLocalTransactional();
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag1"), true,
+ null,
false,
+ new
LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag2"), true,
+ null,
false,
+ new
LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(2, false);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ?
queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ public void testEnqueueOnTwoQueuesAckWithRollback() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList =
_transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ channel1.setLocalTransactional();
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session1,
+ new
AMQShortString("cTag1"), true,
+ null,
false,
+ new
LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.",
queueList.contains(_queue2));
+
+ AMQProtocolSession session2 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel2 = new AMQChannel(session2, 1, _transactionLog);
+ session2.addChannel(channel2);
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1,
session2,
+ new
AMQShortString("cTag2"), true,
+ null,
false,
+ new
LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel2.acknowledgeMessage(1, false);
+
+ // Check that it is still enqueued for queue1
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+
+ channel1.rollback();
+
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1,
queueList.size());
+ assertTrue("Queue1 not contained in list.",
queueList.contains(_queue1));
+
+ checkMessageMetaDataExists(messageId);
+ }
+
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+public class MessagePersistentNoAckTest extends MessagePersistenceTest
+{
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _ack = false;
+ }
+}
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -30,6 +30,9 @@
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the
required properties for this.
+ */
public class MessageReSendTest extends BDBVMTestCase
{
protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -37,6 +37,10 @@
import javax.naming.NamingException;
import java.io.File;
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the
required properties for this.
+ */
+
public class QueueDeleteWhilstRoutingTest extends BDBVMTestCase
{
private static final Logger _logger =
Logger.getLogger(QueueDeleteWhilstRoutingTest.class);
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -37,7 +37,7 @@
public JNDIHelper(String broker)
{
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" +
broker + "'";
+ CONNECTION_NAME =
"amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker +
"'";
setupJNDI();
}
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2009-02-19
19:02:07 UTC (rev 3124)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2009-02-20
13:34:27 UTC (rev 3125)
@@ -26,6 +26,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
@@ -34,6 +35,7 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
import java.io.BufferedReader;
@@ -327,8 +329,8 @@
}
// Note the name of the Vhosts is not important, the store doesnot record the
name of the vhost.
- _newVirtualHost = new VirtualHost("Upgraded", new
MemoryMessageStore());
- _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
+ _newVirtualHost = new VirtualHost(new
VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new
MemoryMessageStore());
+ _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old",
new PropertiesConfiguration()), new MemoryMessageStore());
//Create a new messageStore
_newMessageStore = new BDBMessageStore();