[rhmessaging-commits] rhmessaging commits: r2260 - in store/branches/java/broker-queue-refactor/java/bdbstore: src and 10 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Aug 7 09:34:22 EDT 2008


Author: ritchiem
Date: 2008-08-07 09:34:21 -0400 (Thu, 07 Aug 2008)
New Revision: 2260

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/
   store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/build.xml
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
RHM-4 : Store binding/queue arguments in the store. Additional test to validate, also test that utilises the Broker Test MessageStoreTest to perform integration tests

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/build.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/build.xml	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/build.xml	2008-08-07 13:34:21 UTC (rev 2260)
@@ -36,7 +36,7 @@
                classpathref="class.path"/>
     </target>
 
-    <target name="build-tests" depends="init,build">
+    <target name="build-tests" depends="build">
          <javac srcdir="${src.test.dir}"
                destdir="${build.test.classes}"
                classpathref="test.class.path"/>
@@ -102,7 +102,7 @@
 
   </target>
 
-    <target name="release" depends="build, jar"/>
+    <target name="release" depends="jar"/>
 
 
 </project>

Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/integrationTests/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+public class MessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+
+    public void testBDBMessageStore()
+    {
+        PropertiesConfiguration config = new PropertiesConfiguration();
+
+        config.addProperty("store.environment-path", "BDB_MST");
+        config.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
+        runTestWithStore(config);
+    }
+
+}

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -108,7 +108,6 @@
 
     private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
 
-
     private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
     private Database _queueBindingsDb;
 
@@ -120,10 +119,6 @@
 
     private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
 
-
-
-
-
     private final CommitThread _commitThread = new CommitThread("Commit-Thread");
 
     private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
@@ -158,7 +153,6 @@
         stateTransition(State.INITIAL, State.CONFIGURING);
 
         _log.info("Configuring BDB message store");
-        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
         File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
         if (!environmentPath.exists())
@@ -166,7 +160,7 @@
             if (!environmentPath.mkdirs())
             {
                 throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
-                    + "Ensure the path is correct and that the permissions are correct.");
+                                                   + "Ensure the path is correct and that the permissions are correct.");
             }
         }
 
@@ -180,7 +174,6 @@
         upgradeIfNecessary();
 
         // this recovers durable queues and persistent messages
-
         recover();
 
         stateTransition(State.RECOVERING, State.STARTED);
@@ -196,7 +189,7 @@
         if (_state != requiredState)
         {
             throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
-                + "; currently in state: " + _state);
+                                   + "; currently in state: " + _state);
         }
 
         _state = newState;
@@ -476,15 +469,15 @@
             if (queue == null)
             {
                 _log.error("Unkown queue: " + binding.getQueueName() + " cannot be bound to exchange: "
-                    + exchange.getName());
+                           + exchange.getName());
             }
             else
             {
                 _log.info("Restoring binding: (Exchange: " + binding.getExchangeName() + ", Queue: " + binding
-                    .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
-                    + ")");
+                        .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
+                                        + ")");
 
-                queue.bind(exchange, binding.getRoutingKey(), binding.getArguments() );
+                queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
             }
         }
     }
@@ -503,19 +496,25 @@
             BindingTB binding = new BindingTB(_virtualHost);
 
             BindingKey queueBinding =
-                new BindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
+                    new BindingKey(exchange.getName(), null, null, null);
 
             EntryBinding keyBinding = new BindingTB(_virtualHost);
             keyBinding.objectToEntry(queueBinding, key);
 
             OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
 
-            while ((opStatus == OperationStatus.SUCCESS)
-                    && ((queueBinding = (BindingKey) binding.entryToObject(key)).getExchangeName().equals(
-                            exchange.getName())))
+            while (opStatus == OperationStatus.SUCCESS)
             {
-                queueBindings.add(queueBinding);
-                opStatus = cursor.getNext(key, value, LockMode.RMW);
+                queueBinding = (BindingKey) binding.entryToObject(key);
+                if (queueBinding.getExchangeName().equals(exchange.getName()))
+                {
+                    queueBindings.add(queueBinding);
+                    opStatus = cursor.getNext(key, value, LockMode.RMW);
+                }
+                else
+                {
+                    break;
+                }
             }
 
             return queueBindings;
@@ -592,7 +591,7 @@
             catch (DatabaseException e)
             {
                 throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
-                    + exchange.getName() + " to database: " + e, e);
+                                       + exchange.getName() + " to database: " + e, e);
             }
         }
     }
@@ -608,7 +607,7 @@
      * @throws AMQException If the operation fails for any reason.
      */
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
-        throws AMQException
+            throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new BindingTB(_virtualHost);
@@ -620,37 +619,38 @@
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
-                    + exchange.getName() + "  not found");
+                                       + exchange.getName() + "  not found");
             }
         }
         catch (DatabaseException e)
         {
             throw new AMQException("Error deleting queue binding for queue with name " + queue.getName() + " to exchange "
-                + exchange.getName() + " from database: " + e, e);
+                                   + exchange.getName() + " from database: " + e, e);
         }
     }
 
     /**
      * Makes the specified queue persistent.
      *
-     * @param queue The queue to store.
+     * @param queue     The queue to store.
+     * @param arguments
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void createQueue(AMQQueue queue) throws AMQException
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
-        _log.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+        _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
 
         if (_state != State.RECOVERING)
         {
             long queueId = _queueId.getAndIncrement();
-            _queueNameToIdMap.put(queue.getName(),queueId);
-            
+            _queueNameToIdMap.put(queue.getName(), queueId);
+
             DatabaseEntry key = new DatabaseEntry();
             EntryBinding keyBinding = new AMQShortStringTB();
             keyBinding.objectToEntry(queue.getName(), key);
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding queueBinding = new QueueTB(_virtualHost);
+            TupleBinding queueBinding = new QueueTB(_virtualHost, arguments);
             queueBinding.objectToEntry(queue, value);
             try
             {
@@ -667,6 +667,7 @@
      * Removes the specified queue from the persistent store.
      *
      * @param queue The queue to remove.
+     *
      * @throws AMQException If the operation fails for any reason.
      */
     public void removeQueue(final AMQQueue queue) throws AMQException
@@ -712,7 +713,7 @@
         try
         {
             _queueDb.get(null, key, value, LockMode.RMW);
-            QueueTB binding = new QueueTB(_virtualHost);
+            QueueTB binding = new QueueTB(_virtualHost, null);
 
             return (AMQQueue) binding.entryToObject(value);
         }
@@ -757,7 +758,7 @@
         {
             _log.error("Failed to enqueue: " + e, e);
             throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
-                + " to database", e);
+                                   + " to database", e);
         }
     }
 
@@ -789,7 +790,8 @@
      * @param context   The transactional context for the operation.
      * @param queue     The name queue to take the message from.
      * @param messageId The message to dequeue.
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
@@ -874,7 +876,7 @@
         if (context.getPayload() != null)
         {
             throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
-                + context.getPayload());
+                                   + context.getPayload());
         }
         else
         {
@@ -1076,7 +1078,7 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
-        boolean lastContentBody) throws AMQException
+                                      boolean lastContentBody) throws AMQException
     {
 
         Transaction tx = (Transaction) context.getPayload();
@@ -1092,7 +1094,7 @@
             if (status != OperationStatus.SUCCESS)
             {
                 throw new AMQException("Error adding content chunk " + index + " for message id " + messageId + ": "
-                    + status);
+                                       + status);
             }
 
             if (_log.isDebugEnabled())
@@ -1116,12 +1118,12 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
-        throws AMQException
+            throws AMQException
     {
         if (_log.isDebugEnabled())
         {
             _log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
-                + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
+                       + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
         }
         //This call breaking tests - not sure where the txn it creates should be committed ??
         //getOrCreateTransaction(context);
@@ -1161,7 +1163,7 @@
         if (_log.isDebugEnabled())
         {
             _log.debug("public MessageMetaData getMessageMetaData(StoreContext context = " + context + ", Long messageId = "
-                + messageId + "): called");
+                       + messageId + "): called");
         }
 
         DatabaseEntry key = new DatabaseEntry();
@@ -1233,6 +1235,11 @@
         }
     }
 
+    public boolean isPersistent()
+    {
+        return true;
+    }
+
     Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException, AMQException
     {
         Cursor cursor = null;
@@ -1242,14 +1249,14 @@
             cursor = _queueDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            QueueTB binding = new QueueTB(_virtualHost);
+            QueueTB binding = new QueueTB(_virtualHost, null);
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 AMQQueue queue = (AMQQueue) binding.entryToObject(value);
                 _virtualHost.getQueueRegistry().registerQueue(queue);
                 queues.put(queue.getName(), queue);
                 _log.info("Recovering queue " + queue.getName() + " with owner:"
-                    + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+                          + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
             }
 
             return queues;
@@ -1284,9 +1291,9 @@
     }
 
     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
-        throws DatabaseException, AMQException
+            throws DatabaseException, AMQException
     {
-        Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+        Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
         List<ProcessAction> actions = new ArrayList<ProcessAction>();
 
         Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1316,7 +1323,7 @@
                 AMQQueue queue = queues.get(queueName);
                 if (queue == null)
                 {
-                    queue =  AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+                    queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
                     _virtualHost.getQueueRegistry().registerQueue(queue);
                     queues.put(queueName, queue);
                 }
@@ -1325,19 +1332,19 @@
                 maxId = Math.max(maxId, messageId);
                 AMQMessage message = msgMap.get(messageId);
 
-                if(message != null)
+                if (message != null)
                 {
                     message.incrementReference();
                 }
                 else
                 {
                     message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
-                    msgMap.put(messageId,message);
+                    msgMap.put(messageId, message);
                 }
 
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
+                    _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
                 }
 
                 if (_log.isInfoEnabled())
@@ -1356,7 +1363,7 @@
 
             }
 
-            for(ProcessAction action : actions)
+            for (ProcessAction action : actions)
             {
                 action.process();
             }
@@ -1560,15 +1567,15 @@
             {
                 // _environment.checkpoint(_config);
                 _environment.sync();
-                
+
                 for (Commit commit : jobs)
                 {
                     commit.complete();
                 }
-                if(_jobQueue.get().isEmpty())
+                if (_jobQueue.get().isEmpty())
                 {
                     _hasJobs.set(false);
-                    if(!_jobQueue.get().isEmpty())
+                    if (!_jobQueue.get().isEmpty())
                     {
                         _hasJobs.set(true);
                     }
@@ -1593,9 +1600,9 @@
         public void addJob(Commit commit)
         {
             _jobQueue.get().add(commit);
-            if(_hasJobs.compareAndSet(false, true))
+            if (_hasJobs.compareAndSet(false, true))
             {
-                synchronized(_lock)
+                synchronized (_lock)
                 {
                     _lock.notifyAll();
                 }
@@ -1605,7 +1612,7 @@
         public void close()
         {
             _stopped.set(true);
-            synchronized(_lock)
+            synchronized (_lock)
             {
                 _lock.notifyAll();
             }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.store.berkeleydb;
 
 import org.apache.qpid.framing.AMQShortString;
@@ -3,11 +23,4 @@
 import org.apache.qpid.framing.FieldTable;
 
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 14:11:01
- * To change this template use File | Settings | File Templates.
- */
 public class BindingKey extends Object
 {

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -14,8 +14,6 @@
 {
     private static final Logger _log = Logger.getLogger(BindingTB.class);
 
-
-
     private final VirtualHost _virtualHost;
 
     public BindingTB(VirtualHost virtualHost)
@@ -32,9 +30,7 @@
             AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
             FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
 
-
-
-            return new BindingKey(exchangeName,queueName,routingKey,arguments);
+            return new BindingKey(exchangeName, queueName, routingKey, arguments);
         }
         catch (DatabaseException e)
         {
@@ -47,11 +43,17 @@
     {
         BindingKey binding = (BindingKey) object;
 
+        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+        FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
 
-        AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
-        FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
+        binding = (BindingKey) entryToObject(new TupleInput(tupleOutput.getBufferBytes()));
 
+        System.err.println(binding.getExchangeName());
+        System.err.println(binding.getQueueName());
+        System.err.println(binding.getRoutingKey());
+        System.err.println(binding.getArguments());
+
     }
 }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -14,22 +14,21 @@
 {
     public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
     {
-        int length = tupleInput.readInt();
-        if (length == 0)
+        long length = tupleInput.readLong();
+        if (length <= 0)
         {
             return null;
         }
         else
         {
 
-            byte[] data = new byte[length];
+            byte[] data = new byte[(int)length];
             tupleInput.readFast(data);
 
             ByteBuffer buffer = ByteBuffer.wrap(data);
             try
             {
-                FieldTable ft = new FieldTable(buffer,(long)length);
-                return ft;
+                return new FieldTable(buffer,length);                
             }
             catch (AMQFrameDecodingException e)
             {
@@ -45,11 +44,11 @@
 
         if (fieldTable == null)
         {
-            tupleOutput.writeInt(0);
+            tupleOutput.writeLong(0);
         }
         else
         {
-            tupleOutput.writeFast((int)fieldTable.getEncodedSize());
+            tupleOutput.writeLong(fieldTable.getEncodedSize());
             tupleOutput.writeFast(fieldTable.getDataAsBytes());
         }
     }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -20,39 +20,49 @@
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.log4j.Logger;
 
 public class QueueTB extends TupleBinding
 {
     private static final Logger _log = Logger.getLogger(QueueTB.class);
 
-
-
     private final VirtualHost _virtualHost;
+    private final FieldTable _arguments;
 
-    public QueueTB(VirtualHost virtualHost)
+    public QueueTB(VirtualHost virtualHost, FieldTable arguments)
     {
         _virtualHost = virtualHost;
+        _arguments = arguments;
     }
 
     public Object entryToObject(TupleInput tupleInput)
     {
-        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-
-
         try
         {
-            return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, null);
+            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+            try
+            {
+                return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+            }
+            catch (AMQException e)
+            {
+                _log.error("Unable to create queue: " + e, e);
+                return null;
+            }
         }
-        catch (AMQException e)
+        catch (DatabaseException e)
         {
-            _log.error("Unable to create queue: " + e, e);
+            _log.error("Unable to create binding: " + e, e);
             return null;
         }
     }
@@ -61,9 +71,9 @@
     {
         AMQQueue queue = (AMQQueue) object;
 
+        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
 
-        AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
-
     }
 }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-08-06 13:39:16 UTC (rev 2259)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-08-07 13:34:21 UTC (rev 2260)
@@ -24,15 +24,20 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -59,25 +64,27 @@
     private static final AMQShortString RK = new AMQShortString("rk");
     private static final AMQShortString QUEUE2 = new AMQShortString("queue2");
     private static final AMQShortString HIM = new AMQShortString("him");
+    private static final AMQShortString EXCHANGE1 = new AMQShortString("exchange1");
 
+    private static volatile int _loops;
+    private String TEST_LOCATION = "bdbTestEnv";
+    File BDB_DIR = new File(TEST_LOCATION);
+    
+
     public void setUp() throws Exception
     {
+        if (BDB_DIR.exists())
+        {
+            deleteDirectory(BDB_DIR);
+        }
+
         ApplicationRegistry.initialise(new NullApplicationRegistry());
 
-        File bdbDir = new File("bdbTestEnv");
-        if (bdbDir.exists())
-        {
-            File[] entries = bdbDir.listFiles();
-            for (File f : entries)
-            {
-                f.delete();
-            }
-            bdbDir.delete();
-        }
-        bdbDir.mkdirs();
+        BDB_DIR.mkdirs();
+
         _store = new BDBMessageStore();
 
-        _store.createEnvironment(bdbDir);
+        _store.createEnvironment(BDB_DIR);
         _store.openDatabases();
         _virtualHost = new VirtualHost("test", _store);
         _store.setVirtualHost(_virtualHost);
@@ -86,22 +93,102 @@
         _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
+    private void deleteDirectory(File path) throws InterruptedException
+    {
+        if (path.isDirectory())
+        {
+            for (File file : path.listFiles())
+            {
+                deleteDirectory(file);
+            }
+        }
+        else
+        {
+            path.delete();
+        }
+    }
+
+    private void reload() throws Exception
+    {
+        _virtualHost.close();
+        
+        PropertiesConfiguration env = new PropertiesConfiguration();
+
+        env.addProperty("store.environment-path", "bdbTestEnv");
+        env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
+        _virtualHost = new VirtualHost("test", env);
+        _store = (BDBMessageStore)_virtualHost.getMessageStore();
+    }
+
     public void tearDown() throws Exception
     {
-        _store.close();
+        _virtualHost.close();
+
+        ApplicationRegistry.removeAll();
     }
 
-    public void testQueuePersistence() throws DatabaseException, AMQException
+    public void     testExchangePersistence() throws Exception
     {
-        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
-        AMQQueue returnedQueue = _store.getQueue(QUEUE1);
+        FieldTable queueArguments = new FieldTable();
+        Integer priorityLevel = 5;
+        queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
 
-        Assert.assertEquals(returnedQueue.getName(), QUEUE1);
-        Assert.assertEquals(returnedQueue.getOwner(), ME);
-        Assert.assertEquals(returnedQueue.isDurable(), true);
+        Exchange exchange = new DefaultExchangeFactory(_virtualHost).createExchange(EXCHANGE1, DirectExchange.TYPE.getName(), true, false, 0);
+
+        assertNotNull("Exchange is null", exchange);
+        assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
+        assertTrue("Exchange is not durable", exchange.isDurable());
+
+        _virtualHost.getExchangeRegistry().registerExchange(exchange);
+
+        //Ensure it is registered correctly
+        exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
+        assertNotNull("Exchange is null", exchange);
+
+        reload();
+
+        exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
+
+        assertNotNull("Exchange is null", exchange);
+        assertEquals("Exchange Name incorrect", EXCHANGE1, exchange.getName());
+        assertTrue("Exchange is not durable", exchange.isDurable());
+
     }
 
+    public void testQueuePersistence() throws Exception
+    {
+
+        FieldTable queueArguments = new FieldTable();
+        Integer priorityLevel = 5;
+        queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
+
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
+
+        _store.createQueue(queue, queueArguments);
+
+        AMQShortString routingKey = new AMQShortString("Test-Key");
+        FieldTable bindArguments = new FieldTable();
+        bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
+
+        _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+
+        reload();
+
+        AMQQueue returnedQueue = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
+
+        assertEquals("Queue Name has changed", QUEUE1, returnedQueue.getName());
+        assertEquals("Queue Owner has changed", ME, returnedQueue.getOwner());
+        assertTrue("Returned Queue is not Durable", returnedQueue.isDurable());
+        assertEquals("Returned Queue is not A Priority Queue", AMQPriorityQueue.class, returnedQueue.getClass());
+        assertEquals("Returned Queue does not have the right number of priorities", priorityLevel.intValue(),
+                     ((AMQPriorityQueue) returnedQueue).getPriorities());
+        assertNotNull("Queue has no exchange binding arguments.", returnedQueue.getExchangeBindings());
+        assertEquals("Incorrect binding count for queue.", 1, returnedQueue.getExchangeBindings().size());
+        assertTrue("Binding does not contain a Selector argument.",
+                   returnedQueue.getExchangeBindings().get(0).getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+    }
+
     private MessagePublishInfo createPublishBody()
     {
 
@@ -191,25 +278,25 @@
 
         MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
         MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
-        Assert.assertEquals(pubBody.getExchange(), returnedPubBody.getExchange());
-        Assert.assertEquals(pubBody.isImmediate(), returnedPubBody.isImmediate());
-        Assert.assertEquals(pubBody.isMandatory(), returnedPubBody.isMandatory());
-        Assert.assertEquals(pubBody.getRoutingKey(), returnedPubBody.getRoutingKey());
+        Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
+        Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
+        Assert.assertEquals("Mandatory flag has changed", pubBody.isMandatory(), returnedPubBody.isMandatory());
+        Assert.assertEquals("Routing key has changed", pubBody.getRoutingKey(), returnedPubBody.getRoutingKey());
 
         ContentHeaderBody returnedHeaderBody = mmd.getContentHeaderBody();
-        Assert.assertEquals(chb.classId, returnedHeaderBody.classId);
-        Assert.assertEquals(chb.weight, returnedHeaderBody.weight);
-        Assert.assertEquals(chb.bodySize, returnedHeaderBody.bodySize);
+        Assert.assertEquals("ContentHeader ClassID has changed", chb.classId, returnedHeaderBody.classId);
+        Assert.assertEquals("ContentHeader weight has changed", chb.weight, returnedHeaderBody.weight);
+        Assert.assertEquals("ContentHeader bodySize has changed", chb.bodySize, returnedHeaderBody.bodySize);
         BasicContentHeaderProperties returnedProperties = (BasicContentHeaderProperties) returnedHeaderBody.properties;
-        Assert.assertEquals(props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
-        Assert.assertEquals(props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
-        Assert.assertEquals(mmd.getContentChunkCount(), 1);
+        Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
+        Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
+        Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
         ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L, 0);
         ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
         byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
         returnedPayloadAsBytes.get(returnedPayload);
         String returnedPayloadString = new String(returnedPayload);
-        Assert.assertEquals(bodyText, returnedPayloadString);
+        Assert.assertEquals("Message Payload has changed", bodyText, returnedPayloadString);
     }
 
     public void testMessageCreateAndDelete() throws Exception
@@ -245,6 +332,7 @@
         {
             // pass since exception expected
         }
+        
     }
 
     public void testTranCommit() throws Exception
@@ -259,9 +347,8 @@
         _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
         _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
 
-
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _store.createQueue(queue, null);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 20L);
@@ -269,15 +356,19 @@
         _store.commitTran(_storeContext);
 
         List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
-        Assert.assertEquals(enqueuedIds.size(), 2);
+        Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
-        Assert.assertEquals(val.longValue(), 20L);
+        Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
         val = enqueuedIds.get(1);
-        Assert.assertEquals(val.longValue(), 21L);
+        Assert.assertEquals("Second Message is incorrect", 21L, val.longValue());
+
     }
 
     public void testTranRollback1() throws Exception
     {
+        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
         MessagePublishInfo pubBody = createPublishBody();
         BasicContentHeaderProperties props = createContentHeaderProperties();
         String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -290,9 +381,8 @@
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _store.createQueue(queue, null);
 
-
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 30L);
         _store.enqueueMessage(_storeContext, queue, 31L);
@@ -305,16 +395,22 @@
         _store.beginTran(_storeContext);
         _store.commitTran(_storeContext);
 
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
-        Assert.assertEquals(enqueuedIds.size(), 2);
+                enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
+        assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
-        Assert.assertEquals(val.longValue(), 30L);
+        assertEquals("First Message is incorrect", 30L, val.longValue());
         val = enqueuedIds.get(1);
-        Assert.assertEquals(val.longValue(), 31L);
+        assertEquals("Second Message is incorrect", 31L, val.longValue());
+        
     }
 
+
     public void testTranRollback2() throws Exception
     {
+        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
         MessagePublishInfo pubBody = createPublishBody();
         BasicContentHeaderProperties props = createContentHeaderProperties();
         String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -326,9 +422,8 @@
         _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
-
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _store.createQueue(queue, null);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 30L);
@@ -339,16 +434,19 @@
         _store.enqueueMessage(_storeContext, queue, 32L);
         _store.commitTran(_storeContext);
 
-        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
-        Assert.assertEquals(enqueuedIds.size(), 2);
+        enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
         Long val = enqueuedIds.get(0);
-        Assert.assertEquals(val.longValue(), 31L);
+        Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
         val = enqueuedIds.get(1);
-        Assert.assertEquals(val.longValue(), 32L);
+        Assert.assertEquals("Second Message is incorrect", 32L, val.longValue());
     }
 
     public void testRecovery() throws Exception
     {
+        List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+        assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
+
         MessagePublishInfo pubBody = createPublishBody();
         BasicContentHeaderProperties props = createContentHeaderProperties();
         String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -360,12 +458,11 @@
         _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
         _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
 
-
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 
-        _store.createQueue(queue);
-        _store.createQueue(queue2);
+        _store.createQueue(queue, null);
+        _store.createQueue(queue2, null);
 
         _store.beginTran(_storeContext);
         _store.enqueueMessage(_storeContext, queue, 40L);
@@ -375,29 +472,17 @@
 
         _store.enqueueMessage(_storeContext, queue, 42L);
 
-        _virtualHost.getQueueRegistry().unregisterQueue(queue.getName());
-        _virtualHost.getQueueRegistry().unregisterQueue(queue2.getName());
+        reload();
 
-        _store.close();
-
-        _store = new BDBMessageStore();
-
-        PropertiesConfiguration env = new PropertiesConfiguration();
-
-        env.addProperty("store.environment-path", "bdbTestEnv");
-        env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
-        _virtualHost = new VirtualHost("test", env);
-
         try
         {
             AMQQueue q1 = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
             AMQQueue q2 = _virtualHost.getQueueRegistry().getQueue(QUEUE2);
 
-            Assert.assertNotNull(q1);
-            Assert.assertEquals(3, q1.getMessageCount());
-            Assert.assertNotNull(q2);
-            Assert.assertEquals(1, q2.getMessageCount());
+            Assert.assertNotNull("Queue1 is was not recovered", q1);
+            Assert.assertEquals("Queue1 has incorrect message count", 3, q1.getMessageCount());
+            Assert.assertNotNull("Queue2 is was not recovered", q2);
+            Assert.assertEquals("Queue2 has incorrect message count", 1, q2.getMessageCount());
         }
         catch (Exception e)
         {
@@ -405,7 +490,6 @@
             fail(e.getMessage());
         }
 
-
     }
 
     public void testDequeue() throws AMQException
@@ -420,7 +504,7 @@
         _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
 
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _store.createQueue(queue, null);
 
         _store.enqueueMessage(_storeContext, queue, 50L);
         _store.dequeueMessage(_storeContext, queue, 50L);
@@ -429,7 +513,7 @@
     public void testQueueRemove() throws AMQException
     {
         AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
-        _store.createQueue(queue);
+        _store.createQueue(queue, null);
         _store.removeQueue(queue);
         try
         {




More information about the rhmessaging-commits mailing list