[rhmessaging-commits] rhmessaging commits: r2086 - store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu May 29 12:22:36 EDT 2008


Author: godfrer
Date: 2008-05-29 12:22:36 -0400 (Thu, 29 May 2008)
New Revision: 2086

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
Removed:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
refactoring

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,57 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-public class AMQExchangeTupleBinding extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(AMQExchangeTupleBinding.class);
-
-
-
-    private final VirtualHost _virtualHost;
-
-    public AMQExchangeTupleBinding(VirtualHost virtualHost)
-    {
-        _virtualHost = virtualHost;
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-
-        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
-
-
-        boolean autoDelete = tupleInput.readBoolean();
-
-        try
-        {
-            return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
-        }
-        catch (AMQException e)
-        {
-            _log.error("Unable to create exchange: " + e, e);
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        Exchange exchange = (Exchange) object;
-
-
-        AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
-
-        tupleOutput.writeBoolean(exchange.isAutoDelete());
-
-    }
-}

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,59 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-public class AMQQueueBindingTupleBinding extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(AMQQueueBindingTupleBinding.class);
-
-
-
-    private final VirtualHost _virtualHost;
-
-    public AMQQueueBindingTupleBinding(VirtualHost virtualHost)
-    {
-        _virtualHost = virtualHost;
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        try
-        {
-            AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
-
-
-            return new QueueBindingKey(exchangeName,queueName,routingKey,arguments);
-        }
-        catch (DatabaseException e)
-        {
-            _log.error("Unable to create binding: " + e, e);
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        QueueBindingKey binding = (QueueBindingKey) object;
-
-
-        AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
-        FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
-
-    }
-}

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,68 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.log4j.Logger;
-
-public class AMQQueueTupleBinding extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(AMQQueueTupleBinding.class);
-
-
-
-    private final VirtualHost _virtualHost;
-
-    public AMQQueueTupleBinding(VirtualHost virtualHost)
-    {
-        _virtualHost = virtualHost;
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-
-
-        try
-        {
-            return new AMQQueue(name, true, owner, false, _virtualHost);
-        }
-        catch (AMQException e)
-        {
-            _log.error("Unable to create queue: " + e, e);
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        AMQQueue queue = (AMQQueue) object;
-
-
-        AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
-
-    }
-}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,28 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQShortStringTB extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
+
+
+    public AMQShortStringTB()
+    {
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        return AMQShortStringEncoding.readShortString(tupleInput);
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
+    }
+
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,28 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTupleBinding extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(AMQShortStringTupleBinding.class);
-
-
-    public AMQShortStringTupleBinding()
-    {
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        return AMQShortStringEncoding.readShortString(tupleInput);
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
-    }
-
-}

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -56,6 +56,7 @@
 import java.util.Queue;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -90,27 +91,43 @@
 
     private static final String QUEUEDB_NAME = "queueDb";
 
+    private static final String NEW_QUEUE_DB_NAME = "QUEUE";
+
     /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
     private Database _queueDb;
 
     private static final String DELIVERYDB_NAME = "deliveryDb";
 
+    private static final String QUEUE_ENTRY_DB_NAME = "QUEUE_ENTRY";
+
     /** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
     private Database _deliveryDb;
 
     private static final String EXCHANGEDB_NAME = "exchangeDb";
     private Database _exchangeDb;
 
+    private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
+
+
     private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
     private Database _queueBindingsDb;
 
     private VirtualHost _virtualHost;
 
     private final AtomicLong _messageId = new AtomicLong(1);
+
+    private final AtomicLong _queueId = new AtomicLong(1);
+
     private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
 
+
+
+
+
     private final CommitThread _commitThread = new CommitThread("Commit-Thread");
 
+    private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
+
     private enum State
     {
         INITIAL,
@@ -159,6 +176,9 @@
 
         _commitThread.start();
 
+
+        upgradeIfNecessary();
+
         // this recovers durable queues and persistent messages
 
         recover();
@@ -166,6 +186,11 @@
         stateTransition(State.RECOVERING, State.STARTED);
     }
 
+    private void upgradeIfNecessary()
+    {
+
+    }
+
     private synchronized void stateTransition(State requiredState, State newState) throws AMQException
     {
         if (_state != requiredState)
@@ -389,10 +414,10 @@
         if (_state != State.RECOVERING)
         {
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new AMQShortStringTupleBinding();
+            EntryBinding keyBinding = new AMQShortStringTB();
             keyBinding.objectToEntry(exchange.getName(), key);
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding exchangeBinding = new AMQExchangeTupleBinding(_virtualHost);
+            TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
             exchangeBinding.objectToEntry(exchange, value);
             try
             {
@@ -415,7 +440,7 @@
     public void removeExchange(Exchange exchange) throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTupleBinding();
+        EntryBinding keyBinding = new AMQShortStringTB();
         keyBinding.objectToEntry(exchange.getName(), key);
         try
         {
@@ -445,7 +470,7 @@
 
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
-        for (QueueBindingKey binding : loadQueueBindings(exchange))
+        for (BindingKey binding : loadQueueBindings(exchange))
         {
             AMQQueue queue = queueRegistry.getQueue(binding.getQueueName());
             if (queue == null)
@@ -464,29 +489,29 @@
         }
     }
 
-    private List<QueueBindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
+    private List<BindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
     {
 
         Cursor cursor = null;
-        List<QueueBindingKey> queueBindings = new ArrayList<QueueBindingKey>();
+        List<BindingKey> queueBindings = new ArrayList<BindingKey>();
         try
         {
             cursor = _queueBindingsDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
 
-            AMQQueueBindingTupleBinding binding = new AMQQueueBindingTupleBinding(_virtualHost);
+            BindingTB binding = new BindingTB(_virtualHost);
 
-            QueueBindingKey queueBinding =
-                new QueueBindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
+            BindingKey queueBinding =
+                new BindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
 
-            EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
+            EntryBinding keyBinding = new BindingTB(_virtualHost);
             keyBinding.objectToEntry(queueBinding, key);
 
             OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
 
             while ((opStatus == OperationStatus.SUCCESS)
-                    && ((queueBinding = (QueueBindingKey) binding.entryToObject(key)).getExchangeName().equals(
+                    && ((queueBinding = (BindingKey) binding.entryToObject(key)).getExchangeName().equals(
                             exchange.getName())))
             {
                 queueBindings.add(queueBinding);
@@ -514,7 +539,7 @@
             cursor = _exchangeDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            AMQExchangeTupleBinding binding = new AMQExchangeTupleBinding(_virtualHost);
+            ExchangeTB binding = new ExchangeTB(_virtualHost);
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -555,8 +580,8 @@
         if (_state != State.RECOVERING)
         {
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
-            keyBinding.objectToEntry(new QueueBindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+            EntryBinding keyBinding = new BindingTB(_virtualHost);
+            keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
             DatabaseEntry value = new DatabaseEntry();
             ByteBinding.byteToEntry((byte) 0, value);
 
@@ -586,8 +611,8 @@
         throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
-        keyBinding.objectToEntry(new QueueBindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+        EntryBinding keyBinding = new BindingTB(_virtualHost);
+        keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
 
         try
         {
@@ -618,11 +643,14 @@
 
         if (_state != State.RECOVERING)
         {
+            long queueId = _queueId.getAndIncrement();
+            _queueNameToIdMap.put(queue.getName(),queueId);
+            
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new AMQShortStringTupleBinding();
+            EntryBinding keyBinding = new AMQShortStringTB();
             keyBinding.objectToEntry(queue.getName(), key);
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding queueBinding = new AMQQueueTupleBinding(_virtualHost);
+            TupleBinding queueBinding = new QueueTB(_virtualHost);
             queueBinding.objectToEntry(queue, value);
             try
             {
@@ -647,9 +675,10 @@
 
         _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
 
+        Long queueId = _queueNameToIdMap.remove(name);
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTupleBinding();
+        EntryBinding keyBinding = new AMQShortStringTB();
         keyBinding.objectToEntry(name, key);
         try
         {
@@ -677,13 +706,13 @@
     AMQQueue getQueue(AMQShortString name) throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTupleBinding();
+        EntryBinding keyBinding = new AMQShortStringTB();
         keyBinding.objectToEntry(name, key);
         DatabaseEntry value = new DatabaseEntry();
         try
         {
             _queueDb.get(null, key, value, LockMode.RMW);
-            AMQQueueTupleBinding binding = new AMQQueueTupleBinding(_virtualHost);
+            QueueTB binding = new QueueTB(_virtualHost);
 
             return (AMQQueue) binding.entryToObject(value);
         }
@@ -710,8 +739,8 @@
         AMQShortString name = queue.getName();
         Transaction tx = (Transaction) context.getPayload();
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
-        DeliveryDetailsKey dd = new DeliveryDetailsKey(name, messageId);
+        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        QueueEntryKey dd = new QueueEntryKey(name, messageId);
         keyBinding.objectToEntry(dd, key);
         DatabaseEntry value = new DatabaseEntry();
         ByteBinding.byteToEntry((byte) 0, value);
@@ -769,8 +798,8 @@
         Transaction tx = (Transaction) context.getPayload();
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
-        DeliveryDetailsKey dd = new DeliveryDetailsKey(name, messageId);
+        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        QueueEntryKey dd = new QueueEntryKey(name, messageId);
 
         keyBinding.objectToEntry(dd, key);
 
@@ -955,9 +984,9 @@
 
             DatabaseEntry key = new DatabaseEntry();
 
-            DeliveryDetailsKey dd = new DeliveryDetailsKey(queueName, 0);
+            QueueEntryKey dd = new QueueEntryKey(queueName, 0);
 
-            EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
+            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
             keyBinding.objectToEntry(dd, key);
 
             DatabaseEntry value = new DatabaseEntry();
@@ -965,7 +994,7 @@
             LinkedList<Long> messageIds = new LinkedList<Long>();
 
             OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
-            dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+            dd = (QueueEntryKey) keyBinding.entryToObject(key);
 
             while ((status == OperationStatus.SUCCESS) && dd.queueName.equals(queueName))
             {
@@ -974,7 +1003,7 @@
                 status = cursor.getNext(key, value, LockMode.DEFAULT);
                 if (status == OperationStatus.SUCCESS)
                 {
-                    dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+                    dd = (QueueEntryKey) keyBinding.entryToObject(key);
                 }
             }
 
@@ -1055,7 +1084,7 @@
         TupleBinding keyBinding = new MessageContentKey.TupleBinding();
         keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageContentTupleBinding();
+        TupleBinding messageBinding = new ContentTB();
         messageBinding.objectToEntry(contentBody, value);
         try
         {
@@ -1100,7 +1129,7 @@
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTupleBinding();
+        TupleBinding messageBinding = new MessageMetaDataTB();
         messageBinding.objectToEntry(messageMetaData, value);
         try
         {
@@ -1138,7 +1167,7 @@
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTupleBinding();
+        TupleBinding messageBinding = new MessageMetaDataTB();
 
         try
         {
@@ -1177,7 +1206,7 @@
         TupleBinding keyBinding = new MessageContentKey.TupleBinding();
         keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageContentTupleBinding();
+        TupleBinding messageBinding = new ContentTB();
         if (_log.isDebugEnabled())
         {
             _log.debug("Message Id: " + messageId + " Getting content body chunk: " + index);
@@ -1212,7 +1241,7 @@
             cursor = _queueDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            AMQQueueTupleBinding binding = new AMQQueueTupleBinding(_virtualHost);
+            QueueTB binding = new QueueTB(_virtualHost);
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 AMQQueue queue = (AMQQueue) binding.entryToObject(value);
@@ -1267,7 +1296,7 @@
             Transaction tx = (Transaction) context.getPayload();
             cursor = _deliveryDb.openCursor(tx, null);
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
+            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
 
             DatabaseEntry value = new DatabaseEntry();
             EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
@@ -1279,7 +1308,7 @@
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
 
-                DeliveryDetailsKey dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+                QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
 
                 AMQShortString queueName = dd.queueName;
 

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,49 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 19-Feb-2007
+ * Time: 14:11:01
+ * To change this template use File | Settings | File Templates.
+ */
+public class BindingKey extends Object
+{
+    private final AMQShortString _exchangeName;
+    private final AMQShortString _queueName;
+    private final AMQShortString _routingKey;
+    private final FieldTable _arguments;
+
+    public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
+    {
+        _exchangeName = exchangeName;
+        _queueName = queueName;
+        _routingKey = routingKey;
+        _arguments = arguments;
+    }
+
+
+    public AMQShortString getExchangeName()
+    {
+        return _exchangeName;
+    }
+
+    public AMQShortString getQueueName()
+    {
+        return _queueName;
+    }
+
+    public AMQShortString getRoutingKey()
+    {
+        return _routingKey;
+    }
+
+    public FieldTable getArguments()
+    {
+        return _arguments;
+    }
+
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
___________________________________________________________________
Name: svn:eol-style
   + native

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,57 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.log4j.Logger;
+
+public class BindingTB extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(BindingTB.class);
+
+
+
+    private final VirtualHost _virtualHost;
+
+    public BindingTB(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+
+
+            return new BindingKey(exchangeName,queueName,routingKey,arguments);
+        }
+        catch (DatabaseException e)
+        {
+            _log.error("Unable to create binding: " + e, e);
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        BindingKey binding = (BindingKey) object;
+
+
+        AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
+        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
+        FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
+
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
___________________________________________________________________
Name: svn:eol-style
   + native

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * @author Robert Greig (robert.j.greig at jpmorgan.com)
+ */
+public class ContentTB extends TupleBinding
+{
+    public Object entryToObject(TupleInput tupleInput)
+    {
+
+        final int size = tupleInput.readInt();
+        byte[] underlying = new byte[size];
+        tupleInput.readFast(underlying);
+        final ByteBuffer data  = ByteBuffer.wrap(underlying);
+        ContentChunk cb = new ContentChunk()
+        {
+
+            public int getSize()
+            {
+                return size;
+            }
+
+            public ByteBuffer getData()
+            {
+                return data;
+            }
+
+            public void reduceToFit()
+            {
+
+            }
+        };
+        return cb;
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        ContentChunk cb = (ContentChunk) object;
+        final int size = cb.getSize();
+        byte[] underlying = new byte[size];
+
+        ByteBuffer buf = cb.getData();
+
+        buf.duplicate().rewind().get(underlying);
+
+        tupleOutput.writeInt(size);
+        tupleOutput.writeFast(underlying);
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,64 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * @author Apache Software Foundation
- */
-public class DeliveryDetailsKey
-{
-    public AMQShortString queueName;
-    public long messageId;
-
-
-    public DeliveryDetailsKey()
-    {
-    }
-
-
-    public DeliveryDetailsKey(byte[] payload)
-    {
-        final TupleInput ti = new TupleInput(payload);
-
-        queueName = AMQShortStringEncoding.readShortString(ti);
-
-        messageId = ti.readLong();
-
-    }
-
-    public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
-    {
-        public Object entryToObject(TupleInput tupleInput)
-        {
-            final DeliveryDetailsKey mk = new DeliveryDetailsKey();
-
-
-            mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
-            mk.messageId = tupleInput.readLong();
-
-            return mk;
-        }
-
-        public void objectToEntry(Object object, TupleOutput tupleOutput)
-        {
-            final DeliveryDetailsKey mk = (DeliveryDetailsKey) object;
-
-            AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
-            tupleOutput.writeLong(mk.messageId);
-
-        }
-
-
-    }
-
-    public DeliveryDetailsKey(AMQShortString queueName, long messageId)
-    {
-        this.queueName = queueName;
-        this.messageId = messageId;
-    }
-
-
-}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,57 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+
+public class ExchangeTB extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(ExchangeTB.class);
+
+
+
+    private final VirtualHost _virtualHost;
+
+    public ExchangeTB(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+
+        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+
+
+        boolean autoDelete = tupleInput.readBoolean();
+
+        try
+        {
+            return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+        }
+        catch (AMQException e)
+        {
+            _log.error("Unable to create exchange: " + e, e);
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        Exchange exchange = (Exchange) object;
+
+
+        AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
+        AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
+
+        tupleOutput.writeBoolean(exchange.isAutoDelete());
+
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-/**
- * @author Robert Greig (robert.j.greig at jpmorgan.com)
- */
-public class MessageContentTupleBinding extends TupleBinding
-{
-    public Object entryToObject(TupleInput tupleInput)
-    {
-
-        final int size = tupleInput.readInt();
-        byte[] underlying = new byte[size];
-        tupleInput.readFast(underlying);
-        final ByteBuffer data  = ByteBuffer.wrap(underlying);
-        ContentChunk cb = new ContentChunk()
-        {
-
-            public int getSize()
-            {
-                return size;
-            }
-
-            public ByteBuffer getData()
-            {
-                return data;
-            }
-
-            public void reduceToFit()
-            {
-
-            }
-        };
-        return cb;
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        ContentChunk cb = (ContentChunk) object;
-        final int size = cb.getSize();
-        byte[] underlying = new byte[size];
-
-        ByteBuffer buf = cb.getData();
-
-        buf.duplicate().rewind().get(underlying);
-
-        tupleOutput.writeInt(size);
-        tupleOutput.writeFast(underlying);
-    }
-}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.MessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+            final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+            final int contentChunkCount = tupleInput.readInt();
+            return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+        }
+        catch (Exception e)
+        {
+            _log.error("Error converting entry to object: " + e, e);
+            // annoyingly just have to return null since we cannot throw
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        MessageMetaData message = (MessageMetaData) object;
+        try
+        {
+            writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+        }
+        catch (AMQException e)
+        {
+            // can't do anything else since the BDB interface precludes throwing any exceptions
+            // in practice we should never get an exception
+            throw new RuntimeException("Error converting object to entry: " + e, e);
+        }
+        writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+        tupleOutput.writeInt(message.getContentChunkCount());
+    }
+
+    private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+    {
+
+        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        final boolean mandatory = tupleInput.readBoolean();
+        final boolean immediate = tupleInput.readBoolean();
+
+        return new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return exchange;
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return mandatory;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return routingKey;
+            }
+        }   ;
+
+    }
+
+
+    private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+        int bodySize = tupleInput.readInt();
+        byte[] underlying = new byte[bodySize];
+        tupleInput.readFast(underlying);
+        ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+        return ContentHeaderBody.createFromBuffer(buf, bodySize);
+    }
+
+    private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+    {
+
+
+        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+        tupleOutput.writeBoolean(publishBody.isMandatory());
+        tupleOutput.writeBoolean(publishBody.isImmediate());
+        
+    }
+
+    private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+    {
+        // write out the content header body
+        final int bodySize = headerBody.getSize();
+        byte[] underlying = new byte[bodySize];
+        ByteBuffer buf = ByteBuffer.wrap(underlying);
+        headerBody.writePayload(buf);
+        tupleOutput.writeInt(bodySize);
+        tupleOutput.writeFast(underlying);
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,142 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.MessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTupleBinding extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(MessageMetaDataTupleBinding.class);
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        try
-        {
-            final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
-            final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
-            final int contentChunkCount = tupleInput.readInt();
-            return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
-        }
-        catch (Exception e)
-        {
-            _log.error("Error converting entry to object: " + e, e);
-            // annoyingly just have to return null since we cannot throw
-            return null;
-        }
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        MessageMetaData message = (MessageMetaData) object;
-        try
-        {
-            writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
-        }
-        catch (AMQException e)
-        {
-            // can't do anything else since the BDB interface precludes throwing any exceptions
-            // in practice we should never get an exception
-            throw new RuntimeException("Error converting object to entry: " + e, e);
-        }
-        writeContentHeader(message.getContentHeaderBody(), tupleOutput);
-        tupleOutput.writeInt(message.getContentChunkCount());
-    }
-
-    private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
-    {
-
-        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
-        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
-        final boolean mandatory = tupleInput.readBoolean();
-        final boolean immediate = tupleInput.readBoolean();
-
-        return new MessagePublishInfo()
-        {
-
-            public AMQShortString getExchange()
-            {
-                return exchange;
-            }
-
-            public void setExchange(AMQShortString exchange)
-            {
-
-            }
-
-            public boolean isImmediate()
-            {
-                return immediate;
-            }
-
-            public boolean isMandatory()
-            {
-                return mandatory;
-            }
-
-            public AMQShortString getRoutingKey()
-            {
-                return routingKey;
-            }
-        }   ;
-
-    }
-
-
-    private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
-    {
-        int bodySize = tupleInput.readInt();
-        byte[] underlying = new byte[bodySize];
-        tupleInput.readFast(underlying);
-        ByteBuffer buf = ByteBuffer.wrap(underlying);
-
-        return ContentHeaderBody.createFromBuffer(buf, bodySize);
-    }
-
-    private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
-    {
-
-
-        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
-        tupleOutput.writeBoolean(publishBody.isMandatory());
-        tupleOutput.writeBoolean(publishBody.isImmediate());
-        
-    }
-
-    private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
-    {
-        // write out the content header body
-        final int bodySize = headerBody.getSize();
-        byte[] underlying = new byte[bodySize];
-        ByteBuffer buf = ByteBuffer.wrap(underlying);
-        headerBody.writePayload(buf);
-        tupleOutput.writeInt(bodySize);
-        tupleOutput.writeFast(underlying);
-    }
-}

Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java	2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,49 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 14:11:01
- * To change this template use File | Settings | File Templates.
- */
-public class QueueBindingKey extends Object
-{
-    private final AMQShortString _exchangeName;
-    private final AMQShortString _queueName;
-    private final AMQShortString _routingKey;
-    private final FieldTable _arguments;
-
-    public QueueBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
-    {
-        _exchangeName = exchangeName;
-        _queueName = queueName;
-        _routingKey = routingKey;
-        _arguments = arguments;
-    }
-
-
-    public AMQShortString getExchangeName()
-    {
-        return _exchangeName;
-    }
-
-    public AMQShortString getQueueName()
-    {
-        return _queueName;
-    }
-
-    public AMQShortString getRoutingKey()
-    {
-        return _routingKey;
-    }
-
-    public FieldTable getArguments()
-    {
-        return _arguments;
-    }
-
-}

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,64 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class QueueEntryKey
+{
+    public AMQShortString queueName;
+    public long messageId;
+
+
+    public QueueEntryKey()
+    {
+    }
+
+
+    public QueueEntryKey(byte[] payload)
+    {
+        final TupleInput ti = new TupleInput(payload);
+
+        queueName = AMQShortStringEncoding.readShortString(ti);
+
+        messageId = ti.readLong();
+
+    }
+
+    public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
+    {
+        public Object entryToObject(TupleInput tupleInput)
+        {
+            final QueueEntryKey mk = new QueueEntryKey();
+
+
+            mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
+            mk.messageId = tupleInput.readLong();
+
+            return mk;
+        }
+
+        public void objectToEntry(Object object, TupleOutput tupleOutput)
+        {
+            final QueueEntryKey mk = (QueueEntryKey) object;
+
+            AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
+            tupleOutput.writeLong(mk.messageId);
+
+        }
+
+
+    }
+
+    public QueueEntryKey(AMQShortString queueName, long messageId)
+    {
+        this.queueName = queueName;
+        this.messageId = messageId;
+    }
+
+
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
___________________________________________________________________
Name: svn:eol-style
   + native

Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	                        (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java	2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
+
+public class QueueTB extends TupleBinding
+{
+    private static final Logger _log = Logger.getLogger(QueueTB.class);
+
+
+
+    private final VirtualHost _virtualHost;
+
+    public QueueTB(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+
+
+        try
+        {
+            return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, null);
+        }
+        catch (AMQException e)
+        {
+            _log.error("Unable to create queue: " + e, e);
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        AMQQueue queue = (AMQQueue) object;
+
+
+        AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
+
+    }
+}


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
___________________________________________________________________
Name: svn:eol-style
   + native




More information about the rhmessaging-commits mailing list