[rhmessaging-commits] rhmessaging commits: r2260 - in store/branches/java/broker-queue-refactor/java/bdbstore: src and 10 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Aug 7 09:34:22 EDT 2008
Author: ritchiem
Date: 2008-08-07 09:34:21 -0400 (Thu, 07 Aug 2008)
New Revision: 2260
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/
store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/build.xml
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
RHM-4 : Store binding/queue arguments in the store. Additional test to validate, also test that utilises the Broker Test MessageStoreTest to perform integration tests
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/build.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/build.xml 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/build.xml 2008-08-07 13:34:21 UTC (rev 2260)
@@ -36,7 +36,7 @@
classpathref="class.path"/>
</target>
- <target name="build-tests" depends="init,build">
+ <target name="build-tests" depends="build">
<javac srcdir="${src.test.dir}"
destdir="${build.test.classes}"
classpathref="test.class.path"/>
@@ -102,7 +102,7 @@
</target>
- <target name="release" depends="build, jar"/>
+ <target name="release" depends="jar"/>
</project>
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+public class MessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+
+ public void testBDBMessageStore()
+ {
+ PropertiesConfiguration config = new PropertiesConfiguration();
+
+ config.addProperty("store.environment-path", "BDB_MST");
+ config.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
+ runTestWithStore(config);
+ }
+
+}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -108,7 +108,6 @@
private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
-
private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
private Database _queueBindingsDb;
@@ -120,10 +119,6 @@
private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
-
-
-
-
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
@@ -158,7 +153,6 @@
stateTransition(State.INITIAL, State.CONFIGURING);
_log.info("Configuring BDB message store");
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
if (!environmentPath.exists())
@@ -166,7 +160,7 @@
if (!environmentPath.mkdirs())
{
throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
+ + "Ensure the path is correct and that the permissions are correct.");
}
}
@@ -180,7 +174,6 @@
upgradeIfNecessary();
// this recovers durable queues and persistent messages
-
recover();
stateTransition(State.RECOVERING, State.STARTED);
@@ -196,7 +189,7 @@
if (_state != requiredState)
{
throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
- + "; currently in state: " + _state);
+ + "; currently in state: " + _state);
}
_state = newState;
@@ -476,15 +469,15 @@
if (queue == null)
{
_log.error("Unkown queue: " + binding.getQueueName() + " cannot be bound to exchange: "
- + exchange.getName());
+ + exchange.getName());
}
else
{
_log.info("Restoring binding: (Exchange: " + binding.getExchangeName() + ", Queue: " + binding
- .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
- + ")");
+ .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
+ + ")");
- queue.bind(exchange, binding.getRoutingKey(), binding.getArguments() );
+ queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
}
}
}
@@ -503,19 +496,25 @@
BindingTB binding = new BindingTB(_virtualHost);
BindingKey queueBinding =
- new BindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
+ new BindingKey(exchange.getName(), null, null, null);
EntryBinding keyBinding = new BindingTB(_virtualHost);
keyBinding.objectToEntry(queueBinding, key);
OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
- while ((opStatus == OperationStatus.SUCCESS)
- && ((queueBinding = (BindingKey) binding.entryToObject(key)).getExchangeName().equals(
- exchange.getName())))
+ while (opStatus == OperationStatus.SUCCESS)
{
- queueBindings.add(queueBinding);
- opStatus = cursor.getNext(key, value, LockMode.RMW);
+ queueBinding = (BindingKey) binding.entryToObject(key);
+ if (queueBinding.getExchangeName().equals(exchange.getName()))
+ {
+ queueBindings.add(queueBinding);
+ opStatus = cursor.getNext(key, value, LockMode.RMW);
+ }
+ else
+ {
+ break;
+ }
}
return queueBindings;
@@ -592,7 +591,7 @@
catch (DatabaseException e)
{
throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " to database: " + e, e);
+ + exchange.getName() + " to database: " + e, e);
}
}
}
@@ -608,7 +607,7 @@
* @throws AMQException If the operation fails for any reason.
*/
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- throws AMQException
+ throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new BindingTB(_virtualHost);
@@ -620,37 +619,38 @@
if (status == OperationStatus.NOTFOUND)
{
throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " not found");
+ + exchange.getName() + " not found");
}
}
catch (DatabaseException e)
{
throw new AMQException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
- + exchange.getName() + " from database: " + e, e);
+ + exchange.getName() + " from database: " + e, e);
}
}
/**
* Makes the specified queue persistent.
*
- * @param queue The queue to store.
+ * @param queue The queue to store.
+ * @param arguments
*
* @throws AMQException If the operation fails for any reason.
*/
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- _log.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+ _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
if (_state != State.RECOVERING)
{
long queueId = _queueId.getAndIncrement();
- _queueNameToIdMap.put(queue.getName(),queueId);
-
+ _queueNameToIdMap.put(queue.getName(), queueId);
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(queue.getName(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = new QueueTB(_virtualHost);
+ TupleBinding queueBinding = new QueueTB(_virtualHost, arguments);
queueBinding.objectToEntry(queue, value);
try
{
@@ -667,6 +667,7 @@
* Removes the specified queue from the persistent store.
*
* @param queue The queue to remove.
+ *
* @throws AMQException If the operation fails for any reason.
*/
public void removeQueue(final AMQQueue queue) throws AMQException
@@ -712,7 +713,7 @@
try
{
_queueDb.get(null, key, value, LockMode.RMW);
- QueueTB binding = new QueueTB(_virtualHost);
+ QueueTB binding = new QueueTB(_virtualHost, null);
return (AMQQueue) binding.entryToObject(value);
}
@@ -757,7 +758,7 @@
{
_log.error("Failed to enqueue: " + e, e);
throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
- + " to database", e);
+ + " to database", e);
}
}
@@ -789,7 +790,8 @@
* @param context The transactional context for the operation.
* @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
@@ -874,7 +876,7 @@
if (context.getPayload() != null)
{
throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
- + context.getPayload());
+ + context.getPayload());
}
else
{
@@ -1076,7 +1078,7 @@
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
+ boolean lastContentBody) throws AMQException
{
Transaction tx = (Transaction) context.getPayload();
@@ -1092,7 +1094,7 @@
if (status != OperationStatus.SUCCESS)
{
throw new AMQException("Error adding content chunk " + index + " for message id " + messageId + ": "
- + status);
+ + status);
}
if (_log.isDebugEnabled())
@@ -1116,12 +1118,12 @@
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
- throws AMQException
+ throws AMQException
{
if (_log.isDebugEnabled())
{
_log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
- + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+ + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
//This call breaking tests - not sure where the txn it creates should be committed ??
//getOrCreateTransaction(context);
@@ -1161,7 +1163,7 @@
if (_log.isDebugEnabled())
{
_log.debug("public MessageMetaData getMessageMetaData(StoreContext context = " + context + ", Long messageId = "
- + messageId + "): called");
+ + messageId + "): called");
}
DatabaseEntry key = new DatabaseEntry();
@@ -1233,6 +1235,11 @@
}
}
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException, AMQException
{
Cursor cursor = null;
@@ -1242,14 +1249,14 @@
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- QueueTB binding = new QueueTB(_virtualHost);
+ QueueTB binding = new QueueTB(_virtualHost, null);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
AMQQueue queue = (AMQQueue) binding.entryToObject(value);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queue.getName(), queue);
_log.info("Recovering queue " + queue.getName() + " with owner:"
- + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+ + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
}
return queues;
@@ -1284,9 +1291,9 @@
}
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
- throws DatabaseException, AMQException
+ throws DatabaseException, AMQException
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+ Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1316,7 +1323,7 @@
AMQQueue queue = queues.get(queueName);
if (queue == null)
{
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
}
@@ -1325,19 +1332,19 @@
maxId = Math.max(maxId, messageId);
AMQMessage message = msgMap.get(messageId);
- if(message != null)
+ if (message != null)
{
message.incrementReference();
}
else
{
message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
- msgMap.put(messageId,message);
+ msgMap.put(messageId, message);
}
if (_log.isDebugEnabled())
{
- _log.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
+ _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
}
if (_log.isInfoEnabled())
@@ -1356,7 +1363,7 @@
}
- for(ProcessAction action : actions)
+ for (ProcessAction action : actions)
{
action.process();
}
@@ -1560,15 +1567,15 @@
{
// _environment.checkpoint(_config);
_environment.sync();
-
+
for (Commit commit : jobs)
{
commit.complete();
}
- if(_jobQueue.get().isEmpty())
+ if (_jobQueue.get().isEmpty())
{
_hasJobs.set(false);
- if(!_jobQueue.get().isEmpty())
+ if (!_jobQueue.get().isEmpty())
{
_hasJobs.set(true);
}
@@ -1593,9 +1600,9 @@
public void addJob(Commit commit)
{
_jobQueue.get().add(commit);
- if(_hasJobs.compareAndSet(false, true))
+ if (_hasJobs.compareAndSet(false, true))
{
- synchronized(_lock)
+ synchronized (_lock)
{
_lock.notifyAll();
}
@@ -1605,7 +1612,7 @@
public void close()
{
_stopped.set(true);
- synchronized(_lock)
+ synchronized (_lock)
{
_lock.notifyAll();
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.framing.AMQShortString;
@@ -3,11 +23,4 @@
import org.apache.qpid.framing.FieldTable;
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 14:11:01
- * To change this template use File | Settings | File Templates.
- */
public class BindingKey extends Object
{
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -14,8 +14,6 @@
{
private static final Logger _log = Logger.getLogger(BindingTB.class);
-
-
private final VirtualHost _virtualHost;
public BindingTB(VirtualHost virtualHost)
@@ -32,9 +30,7 @@
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
-
- return new BindingKey(exchangeName,queueName,routingKey,arguments);
+ return new BindingKey(exchangeName, queueName, routingKey, arguments);
}
catch (DatabaseException e)
{
@@ -47,11 +43,17 @@
{
BindingKey binding = (BindingKey) object;
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
- FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
+ binding = (BindingKey) entryToObject(new TupleInput(tupleOutput.getBufferBytes()));
+ System.err.println(binding.getExchangeName());
+ System.err.println(binding.getQueueName());
+ System.err.println(binding.getRoutingKey());
+ System.err.println(binding.getArguments());
+
}
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -14,22 +14,21 @@
{
public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
{
- int length = tupleInput.readInt();
- if (length == 0)
+ long length = tupleInput.readLong();
+ if (length <= 0)
{
return null;
}
else
{
- byte[] data = new byte[length];
+ byte[] data = new byte[(int)length];
tupleInput.readFast(data);
ByteBuffer buffer = ByteBuffer.wrap(data);
try
{
- FieldTable ft = new FieldTable(buffer,(long)length);
- return ft;
+ return new FieldTable(buffer,length);
}
catch (AMQFrameDecodingException e)
{
@@ -45,11 +44,11 @@
if (fieldTable == null)
{
- tupleOutput.writeInt(0);
+ tupleOutput.writeLong(0);
}
else
{
- tupleOutput.writeFast((int)fieldTable.getEncodedSize());
+ tupleOutput.writeLong(fieldTable.getEncodedSize());
tupleOutput.writeFast(fieldTable.getDataAsBytes());
}
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -20,39 +20,49 @@
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.log4j.Logger;
public class QueueTB extends TupleBinding
{
private static final Logger _log = Logger.getLogger(QueueTB.class);
-
-
private final VirtualHost _virtualHost;
+ private final FieldTable _arguments;
- public QueueTB(VirtualHost virtualHost)
+ public QueueTB(VirtualHost virtualHost, FieldTable arguments)
{
_virtualHost = virtualHost;
+ _arguments = arguments;
}
public Object entryToObject(TupleInput tupleInput)
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-
-
try
{
- return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, null);
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ try
+ {
+ return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to create queue: " + e, e);
+ return null;
+ }
}
- catch (AMQException e)
+ catch (DatabaseException e)
{
- _log.error("Unable to create queue: " + e, e);
+ _log.error("Unable to create binding: " + e, e);
return null;
}
}
@@ -61,9 +71,9 @@
{
AMQQueue queue = (AMQQueue) object;
+ AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
-
}
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-08-07 13:34:21 UTC (rev 2260)
@@ -24,15 +24,20 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -59,25 +64,27 @@
private static final AMQShortString RK = new AMQShortString("rk");
private static final AMQShortString QUEUE2 = new AMQShortString("queue2");
private static final AMQShortString HIM = new AMQShortString("him");
+ private static final AMQShortString EXCHANGE1 = new AMQShortString("exchange1");
+ private static volatile int _loops;
+ private String TEST_LOCATION = "bdbTestEnv";
+ File BDB_DIR = new File(TEST_LOCATION);
+
+
public void setUp() throws Exception
{
+ if (BDB_DIR.exists())
+ {
+ deleteDirectory(BDB_DIR);
+ }
+
ApplicationRegistry.initialise(new NullApplicationRegistry());
- File bdbDir = new File("bdbTestEnv");
- if (bdbDir.exists())
- {
- File[] entries = bdbDir.listFiles();
- for (File f : entries)
- {
- f.delete();
- }
- bdbDir.delete();
- }
- bdbDir.mkdirs();
+ BDB_DIR.mkdirs();
+
_store = new BDBMessageStore();
- _store.createEnvironment(bdbDir);
+ _store.createEnvironment(BDB_DIR);
_store.openDatabases();
_virtualHost = new VirtualHost("test", _store);
_store.setVirtualHost(_virtualHost);
@@ -86,22 +93,102 @@
_txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
+ private void deleteDirectory(File path) throws InterruptedException
+ {
+ if (path.isDirectory())
+ {
+ for (File file : path.listFiles())
+ {
+ deleteDirectory(file);
+ }
+ }
+ else
+ {
+ path.delete();
+ }
+ }
+
+ private void reload() throws Exception
+ {
+ _virtualHost.close();
+
+ PropertiesConfiguration env = new PropertiesConfiguration();
+
+ env.addProperty("store.environment-path", "bdbTestEnv");
+ env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
+ _virtualHost = new VirtualHost("test", env);
+ _store = (BDBMessageStore)_virtualHost.getMessageStore();
+ }
+
public void tearDown() throws Exception
{
- _store.close();
+ _virtualHost.close();
+
+ ApplicationRegistry.removeAll();
}
- public void testQueuePersistence() throws DatabaseException, AMQException
+ public void testExchangePersistence() throws Exception
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
- AMQQueue returnedQueue = _store.getQueue(QUEUE1);
+ FieldTable queueArguments = new FieldTable();
+ Integer priorityLevel = 5;
+ queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
- Assert.assertEquals(returnedQueue.getName(), QUEUE1);
- Assert.assertEquals(returnedQueue.getOwner(), ME);
- Assert.assertEquals(returnedQueue.isDurable(), true);
+ Exchange exchange = new DefaultExchangeFactory(_virtualHost).createExchange(EXCHANGE1, DirectExchange.TYPE.getName(), true, false, 0);
+
+ assertNotNull("Exchange is null", exchange);
+ assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
+ assertTrue("Exchange is not durable", exchange.isDurable());
+
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+
+ //Ensure it is registered correctly
+ exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
+ assertNotNull("Exchange is null", exchange);
+
+ reload();
+
+ exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
+
+ assertNotNull("Exchange is null", exchange);
+ assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
+ assertTrue("Exchange is not durable", exchange.isDurable());
+
}
+ public void testQueuePersistence() throws Exception
+ {
+
+ FieldTable queueArguments = new FieldTable();
+ Integer priorityLevel = 5;
+ queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
+
+ _store.createQueue(queue, queueArguments);
+
+ AMQShortString routingKey = new AMQShortString("Test-Key");
+ FieldTable bindArguments = new FieldTable();
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
+
+ _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+
+ reload();
+
+ AMQQueue returnedQueue = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
+
+ assertEquals("Queue Name has changed", QUEUE1, returnedQueue.getName());
+ assertEquals("Queue Owner has changed", ME, returnedQueue.getOwner());
+ assertTrue("Returned Queue is not Durable", returnedQueue.isDurable());
+ assertEquals("Returned Queue is not A Priority Queue", AMQPriorityQueue.class, returnedQueue.getClass());
+ assertEquals("Returned Queue does not have the right number of priorities", priorityLevel.intValue(),
+ ((AMQPriorityQueue) returnedQueue).getPriorities());
+ assertNotNull("Queue has no exchange binding arguments.", returnedQueue.getExchangeBindings());
+ assertEquals("Incorrect binding count for queue.", 1, returnedQueue.getExchangeBindings().size());
+ assertTrue("Binding does not contain a Selector argument.",
+ returnedQueue.getExchangeBindings().get(0).getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ }
+
private MessagePublishInfo createPublishBody()
{
@@ -191,25 +278,25 @@
MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
- Assert.assertEquals(pubBody.getExchange(), returnedPubBody.getExchange());
- Assert.assertEquals(pubBody.isImmediate(), returnedPubBody.isImmediate());
- Assert.assertEquals(pubBody.isMandatory(), returnedPubBody.isMandatory());
- Assert.assertEquals(pubBody.getRoutingKey(), returnedPubBody.getRoutingKey());
+ Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
+ Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
+ Assert.assertEquals("Mandatory flag has changed", pubBody.isMandatory(), returnedPubBody.isMandatory());
+ Assert.assertEquals("Routing key has changed", pubBody.getRoutingKey(), returnedPubBody.getRoutingKey());
ContentHeaderBody returnedHeaderBody = mmd.getContentHeaderBody();
- Assert.assertEquals(chb.classId, returnedHeaderBody.classId);
- Assert.assertEquals(chb.weight, returnedHeaderBody.weight);
- Assert.assertEquals(chb.bodySize, returnedHeaderBody.bodySize);
+ Assert.assertEquals("ContentHeader ClassID has changed", chb.classId, returnedHeaderBody.classId);
+ Assert.assertEquals("ContentHeader weight has changed", chb.weight, returnedHeaderBody.weight);
+ Assert.assertEquals("ContentHeader bodySize has changed", chb.bodySize, returnedHeaderBody.bodySize);
BasicContentHeaderProperties returnedProperties = (BasicContentHeaderProperties) returnedHeaderBody.properties;
- Assert.assertEquals(props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
- Assert.assertEquals(props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
- Assert.assertEquals(mmd.getContentChunkCount(), 1);
+ Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
+ Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
+ Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L, 0);
ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
returnedPayloadAsBytes.get(returnedPayload);
String returnedPayloadString = new String(returnedPayload);
- Assert.assertEquals(bodyText, returnedPayloadString);
+ Assert.assertEquals("Message Payload has changed", bodyText, returnedPayloadString);
}
public void testMessageCreateAndDelete() throws Exception
@@ -245,6 +332,7 @@
{
// pass since exception expected
}
+
}
public void testTranCommit() throws Exception
@@ -259,9 +347,8 @@
_store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
_store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
-
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _store.createQueue(queue, null);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 20L);
@@ -269,15 +356,19 @@
_store.commitTran(_storeContext);
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals(enqueuedIds.size(), 2);
+ Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
- Assert.assertEquals(val.longValue(), 20L);
+ Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
val = enqueuedIds.get(1);
- Assert.assertEquals(val.longValue(), 21L);
+ Assert.assertEquals("Second Message is incorrect", 21L, val.longValue());
+
}
public void testTranRollback1() throws Exception
{
+ List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
MessagePublishInfo pubBody = createPublishBody();
BasicContentHeaderProperties props = createContentHeaderProperties();
String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -290,9 +381,8 @@
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _store.createQueue(queue, null);
-
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 30L);
_store.enqueueMessage(_storeContext, queue, 31L);
@@ -305,16 +395,22 @@
_store.beginTran(_storeContext);
_store.commitTran(_storeContext);
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals(enqueuedIds.size(), 2);
+ enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
+ assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
- Assert.assertEquals(val.longValue(), 30L);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
val = enqueuedIds.get(1);
- Assert.assertEquals(val.longValue(), 31L);
+ assertEquals("Second Message is incorrect", 31L, val.longValue());
+
}
+
public void testTranRollback2() throws Exception
{
+ List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
MessagePublishInfo pubBody = createPublishBody();
BasicContentHeaderProperties props = createContentHeaderProperties();
String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -326,9 +422,8 @@
_store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
-
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _store.createQueue(queue, null);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 30L);
@@ -339,16 +434,19 @@
_store.enqueueMessage(_storeContext, queue, 32L);
_store.commitTran(_storeContext);
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals(enqueuedIds.size(), 2);
+ enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
- Assert.assertEquals(val.longValue(), 31L);
+ Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
val = enqueuedIds.get(1);
- Assert.assertEquals(val.longValue(), 32L);
+ Assert.assertEquals("Second Message is incorrect", 32L, val.longValue());
}
public void testRecovery() throws Exception
{
+ List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
MessagePublishInfo pubBody = createPublishBody();
BasicContentHeaderProperties props = createContentHeaderProperties();
String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -360,12 +458,11 @@
_store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
_store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
-
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
- _store.createQueue(queue);
- _store.createQueue(queue2);
+ _store.createQueue(queue, null);
+ _store.createQueue(queue2, null);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 40L);
@@ -375,29 +472,17 @@
_store.enqueueMessage(_storeContext, queue, 42L);
- _virtualHost.getQueueRegistry().unregisterQueue(queue.getName());
- _virtualHost.getQueueRegistry().unregisterQueue(queue2.getName());
+ reload();
- _store.close();
-
- _store = new BDBMessageStore();
-
- PropertiesConfiguration env = new PropertiesConfiguration();
-
- env.addProperty("store.environment-path", "bdbTestEnv");
- env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
- _virtualHost = new VirtualHost("test", env);
-
try
{
AMQQueue q1 = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
AMQQueue q2 = _virtualHost.getQueueRegistry().getQueue(QUEUE2);
- Assert.assertNotNull(q1);
- Assert.assertEquals(3, q1.getMessageCount());
- Assert.assertNotNull(q2);
- Assert.assertEquals(1, q2.getMessageCount());
+ Assert.assertNotNull("Queue1 is was not recovered", q1);
+ Assert.assertEquals("Queue1 has incorrect message count", 3, q1.getMessageCount());
+ Assert.assertNotNull("Queue2 is was not recovered", q2);
+ Assert.assertEquals("Queue2 has incorrect message count", 1, q2.getMessageCount());
}
catch (Exception e)
{
@@ -405,7 +490,6 @@
fail(e.getMessage());
}
-
}
public void testDequeue() throws AMQException
@@ -420,7 +504,7 @@
_store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _store.createQueue(queue, null);
_store.enqueueMessage(_storeContext, queue, 50L);
_store.dequeueMessage(_storeContext, queue, 50L);
@@ -429,7 +513,7 @@
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _store.createQueue(queue, null);
_store.removeQueue(queue);
try
{
More information about the rhmessaging-commits
mailing list