[rhmessaging-commits] rhmessaging commits: r4456 - in store/branches/java/0.5.x-dev: src/main/java/org/apache/qpid/server/store/berkeleydb and 5 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed May 4 05:13:26 EDT 2011


Author: rgemmell
Date: 2011-05-04 05:13:25 -0400 (Wed, 04 May 2011)
New Revision: 4456

Added:
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
Removed:
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
Modified:
   store/branches/java/0.5.x-dev/etc/config.xml
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
   store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
   store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
   store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
A signed byte was previously used to store AMQShortString lengths, which resulted in lengths greater than 127 being converted into a negative value upon recovery and assumed to be null. Update the store implementation to use a short for the length to allow encoding the 255 legal values and null.

Applied patch from Oleksandr Rudyy

Modified: store/branches/java/0.5.x-dev/etc/config.xml
===================================================================
--- store/branches/java/0.5.x-dev/etc/config.xml	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/etc/config.xml	2011-05-04 09:13:25 UTC (rev 4456)
@@ -45,7 +45,9 @@
     <management>
         <enabled>true</enabled>
         <jmxport>8999</jmxport>
-        <security-enabled>false</security-enabled>
+        <ssl>
+            <enabled>false</enabled>
+        </ssl>
     </management>
     <advanced>
         <filterchain enableExecutorPool="true"/>
@@ -124,7 +126,6 @@
         <auto_register>true</auto_register>
     </queue>
 
-    <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
 </broker>
 
 

Deleted: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,46 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 13:56:56
- * To change this template use File | Settings | File Templates.
- */
-public class AMQShortStringEncoding
-{
-    public static AMQShortString readShortString(TupleInput tupleInput)
-    {
-        int length = (int) tupleInput.readByte();
-        if (length < 0)
-        {
-            return null;
-        }
-        else
-        {
-            byte[] stringBytes = new byte[length];
-            tupleInput.readFast(stringBytes);
-            return new AMQShortString(stringBytes);
-        }
-
-    }
-
-    public  static void writeShortString(AMQShortString shortString, TupleOutput tupleOutput)
-    {
-
-        if (shortString == null)
-        {
-            tupleOutput.writeByte(-1);
-        }
-        else
-        {
-            tupleOutput.writeByte(shortString.length());
-            tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
-        }
-    }
-}

Deleted: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,28 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTB extends TupleBinding
-{
-    private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
-
-
-    public AMQShortStringTB()
-    {
-    }
-
-    public Object entryToObject(TupleInput tupleInput)
-    {
-        return AMQShortStringEncoding.readShortString(tupleInput);
-    }
-
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
-    {
-        AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
-    }
-
-}

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -53,7 +53,9 @@
 import org.apache.qpid.server.store.AbstractMessageStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
 import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
 import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -88,7 +90,7 @@
 {
     private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
 
-    private static final int DATABASE_FORMAT_VERSION = 2;
+    public static final int DATABASE_FORMAT_VERSION = 4;
     private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
 
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -138,6 +140,13 @@
     private QueueTupleBindingFactory _queueTupleBindingFactory;
     private BindingTupleBindingFactory _bindingTupleBindingFactory;
 
+    // factory for AMQShortStringTupleBinding
+    private AMQShortStringTupleBindingFactory _shortStringTupleBindingFactory;
+
+    // AMQShortString tuple binding object is stateless and can be reused across
+    // different methods
+    private TupleBinding<AMQShortString> _shortStringTupleBinding;
+
     Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
 
     /** The data version this store should run with */
@@ -233,6 +242,9 @@
 
         setDatabaseNames(_version);
 
+        // create AMQShortString tuple binding for database version
+        _shortStringTupleBinding = AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+
         if (virtualHost != null)
         {
             setVirtualHost(virtualHost);
@@ -328,8 +340,9 @@
 
     private void createTupleBindingFactories(int version)
     {
-        _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
-        _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
+        _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost, _shortStringTupleBinding);
+        _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost, _shortStringTupleBinding);
+        _shortStringTupleBindingFactory = new AMQShortStringTupleBindingFactory(version, _virtualHost);
     }
 
     private synchronized void stateTransition(State requiredState, State newState) throws AMQException
@@ -616,10 +629,10 @@
         if (_state != State.RECOVERING)
         {
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new AMQShortStringTB();
-            keyBinding.objectToEntry(exchange.getName(), key);
+
+            _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
+            TupleBinding<Exchange> exchangeBinding = new ExchangeTB(_virtualHost, _shortStringTupleBinding);
             exchangeBinding.objectToEntry(exchange, value);
             try
             {
@@ -643,8 +656,8 @@
     public void removeExchange(Exchange exchange) throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTB();
-        keyBinding.objectToEntry(exchange.getName(), key);
+
+        _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
         try
         {
             OperationStatus status = _exchangeDb.delete(null, key);
@@ -747,7 +760,7 @@
             cursor = _exchangeDb.openCursor(null, null);
             DatabaseEntry key = new DatabaseEntry();
             DatabaseEntry value = new DatabaseEntry();
-            TupleBinding binding = new ExchangeTB(_virtualHost);
+            TupleBinding<Exchange> binding = new ExchangeTB(_virtualHost, _shortStringTupleBinding);
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -863,9 +876,9 @@
 
             DatabaseEntry key = new DatabaseEntry();
 
-            EntryBinding keyBinding = new AMQShortStringTB();
-            keyBinding.objectToEntry(queue.getName(), key);
 
+            _shortStringTupleBinding.objectToEntry(queue.getName(), key);
+
             DatabaseEntry value = new DatabaseEntry();
             TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
 
@@ -899,8 +912,8 @@
         Long queueId = _queueNameToIdMap.remove(name);
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTB();
-        keyBinding.objectToEntry(name, key);
+
+        _shortStringTupleBinding.objectToEntry(name, key);
         try
         {
             OperationStatus status = _queueDb.delete(null, key);
@@ -927,8 +940,8 @@
     AMQQueue getQueue(AMQShortString name) throws AMQException
     {
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new AMQShortStringTB();
-        keyBinding.objectToEntry(name, key);
+
+        _shortStringTupleBinding.objectToEntry(name, key);
         DatabaseEntry value = new DatabaseEntry();
         try
         {
@@ -960,7 +973,7 @@
         AMQShortString name = queue.getName();
         Transaction tx = (Transaction) context.getPayload();
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
         QueueEntryKey dd = new QueueEntryKey(name, messageId);
         keyBinding.objectToEntry(dd, key);
         DatabaseEntry value = new DatabaseEntry();
@@ -1020,7 +1033,7 @@
         Transaction tx = (Transaction) context.getPayload();
 
         DatabaseEntry key = new DatabaseEntry();
-        EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+        EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
         QueueEntryKey dd = new QueueEntryKey(name, messageId);
 
         keyBinding.objectToEntry(dd, key);
@@ -1208,7 +1221,7 @@
 
             QueueEntryKey dd = new QueueEntryKey(queueName, 0);
 
-            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+            EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
             keyBinding.objectToEntry(dd, key);
 
             DatabaseEntry value = new DatabaseEntry();
@@ -1368,7 +1381,7 @@
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTB();
+        TupleBinding<MessageMetaData> messageBinding = new MessageMetaDataTB(_shortStringTupleBinding);
         messageBinding.objectToEntry(messageMetaData, value);
         try
         {
@@ -1406,7 +1419,7 @@
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
         keyBinding.objectToEntry(messageId, key);
         DatabaseEntry value = new DatabaseEntry();
-        TupleBinding messageBinding = new MessageMetaDataTB();
+        TupleBinding<MessageMetaData> messageBinding = new MessageMetaDataTB(_shortStringTupleBinding);
 
         try
         {
@@ -1521,6 +1534,11 @@
         return _bindingTupleBindingFactory;
     }
 
+    public AMQShortStringTupleBindingFactory getShortStringTupleBindingFactory()
+    {
+        return _shortStringTupleBindingFactory;
+    }
+
     //Package getters for the various databases used by the Store
 
     Database getMetaDataDb()
@@ -1647,7 +1665,7 @@
             Transaction tx = (Transaction) context.getPayload();
             cursor = _deliveryDb.openCursor(tx, null);
             DatabaseEntry key = new DatabaseEntry();
-            EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+            EntryBinding<QueueEntryKey> keyBinding = new QueueEntryKeyTupleBinding(_shortStringTupleBinding);
 
             DatabaseEntry value = new DatabaseEntry();
             EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -9,22 +9,25 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class ExchangeTB extends TupleBinding
+public class ExchangeTB extends TupleBinding<Exchange>
 {
     private static final Logger _log = Logger.getLogger(ExchangeTB.class);
 
     private final VirtualHost _virtualHost;
 
-    public ExchangeTB(VirtualHost virtualHost)
+    private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    public ExchangeTB(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringEncoder)
     {
         _virtualHost = virtualHost;
+        _shortStringTupleBinding = shortStringEncoder;
     }
 
-    public Object entryToObject(TupleInput tupleInput)
+    public Exchange entryToObject(TupleInput tupleInput)
     {
 
-        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString typeName = _shortStringTupleBinding.entryToObject(tupleInput);
 
         boolean autoDelete = tupleInput.readBoolean();
 
@@ -56,13 +59,11 @@
         }
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(Exchange exchange, TupleOutput tupleOutput)
     {
-        Exchange exchange = (Exchange) object;
+        _shortStringTupleBinding.objectToEntry(exchange.getName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(exchange.getType(), tupleOutput);
 
-        AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
-
         tupleOutput.writeBoolean(exchange.isAutoDelete());
 
     }

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -17,25 +17,36 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.MessageMetaData;
 
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
 /**
  * Handles the mapping to and from message meta data
  */
-public class MessageMetaDataTB extends TupleBinding
+public class MessageMetaDataTB extends TupleBinding<MessageMetaData>
 {
     private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
 
-    public Object entryToObject(TupleInput tupleInput)
+    private final TupleBinding<AMQShortString> _shortStringBinding;
+
+    public MessageMetaDataTB(TupleBinding<AMQShortString> shortStringBinding)
     {
+        _shortStringBinding = shortStringBinding;
+    }
+
+    public MessageMetaData entryToObject(TupleInput tupleInput)
+    {
         try
         {
             final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
@@ -51,9 +62,8 @@
         }
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(MessageMetaData message, TupleOutput tupleOutput)
     {
-        MessageMetaData message = (MessageMetaData) object;
         try
         {
             writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
@@ -71,8 +81,8 @@
     private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
     {
 
-        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
-        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        final AMQShortString exchange = _shortStringBinding.entryToObject(tupleInput);
+        final AMQShortString routingKey = _shortStringBinding.entryToObject(tupleInput);
         final boolean mandatory = tupleInput.readBoolean();
         final boolean immediate = tupleInput.readBoolean();
 
@@ -122,8 +132,8 @@
     {
 
 
-        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+        _shortStringBinding.objectToEntry(publishBody.getExchange(), tupleOutput);
+        _shortStringBinding.objectToEntry(publishBody.getRoutingKey(), tupleOutput);
         tupleOutput.writeBoolean(publishBody.isMandatory());
         tupleOutput.writeBoolean(publishBody.isImmediate());
         

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,7 +1,5 @@
 package org.apache.qpid.server.store.berkeleydb;
 
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
 
 import org.apache.qpid.framing.AMQShortString;
 
@@ -18,42 +16,6 @@
     {
     }
 
-
-    public QueueEntryKey(byte[] payload)
-    {
-        final TupleInput ti = new TupleInput(payload);
-
-        queueName = AMQShortStringEncoding.readShortString(ti);
-
-        messageId = ti.readLong();
-
-    }
-
-    public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
-    {
-        public Object entryToObject(TupleInput tupleInput)
-        {
-            final QueueEntryKey mk = new QueueEntryKey();
-
-
-            mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
-            mk.messageId = tupleInput.readLong();
-
-            return mk;
-        }
-
-        public void objectToEntry(Object object, TupleOutput tupleOutput)
-        {
-            final QueueEntryKey mk = (QueueEntryKey) object;
-
-            AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
-            tupleOutput.writeLong(mk.messageId);
-
-        }
-
-
-    }
-
     public QueueEntryKey(AMQShortString queueName, long messageId)
     {
         this.queueName = queueName;

Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * This class is responsible for reading/writing of {@link AMQShortString}
+ * from/into BDB tuple.
+ * <p>
+ * However, it contains a bug and can only read/write correctly
+ * {@link AMQShortString} objects having length less then 128.
+ * <p>
+ * For AMQShortString objects with length greater than 127 characters a read
+ * operation returns null due to writing a string length as a byte which causes
+ * converting values greater then 127 into negative signed byte value.
+ */
+public class AMQShortStringTB_2 extends TupleBinding<AMQShortString>
+{
+
+    /**
+     * Reads short string object from the given tuple input.
+     * <p>
+     * The length of the string is read from a first byte and followed by the
+     * string characters if there is any.
+     * <p>
+     * A null is returned for a string with negative length.
+     *
+     * @param tupleInput
+     *            tuple input
+     * @return AMQShortString object or null if tuple contain a negative value
+     *         in a first byte.
+     */
+    @Override
+    public AMQShortString entryToObject(TupleInput tupleInput)
+    {
+        int length = (int) tupleInput.readByte();
+        if (length < 0)
+        {
+            return null;
+        }
+        else
+        {
+            byte[] stringBytes = new byte[length];
+            tupleInput.readFast(stringBytes);
+            return new AMQShortString(stringBytes);
+        }
+    }
+
+    /**
+     * Writes given short string object into given tuple output.
+     * <p>
+     * The string is stored as sequence bytes where first byte contains a string
+     * length and followed by string characters.
+     * <p>
+     * Only strings with length less then 128 can be written correctly with this
+     * method.
+     * <p>
+     * Length values greater then 127 are converted into negative signed byte
+     * values
+     * <p>
+     * Null is written as -1.
+     *
+     * @param object
+     *            string to write
+     * @param tupleOutput
+     *            output tuple to write string into.
+     */
+    @Override
+    public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
+    {
+        if (object == null)
+        {
+            tupleOutput.writeByte(-1);
+        }
+        else
+        {
+            tupleOutput.writeByte(object.length());
+            tupleOutput.writeFast(object.getBytes(), 0, object.length());
+        }
+    }
+
+}

Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * A correct implementation of AMQShortString Tuple binding to use for
+ * reading/writing of {@link AMQShortString} objects from/into tuple.
+ *
+ */
+public class AMQShortStringTB_4 extends TupleBinding<AMQShortString>
+{
+    /**
+     * Reads an <code>AMQShortString</code> from given <code>TupleInput</code>
+     * <p>
+     * The length of the string is read from 2 first bytes following string
+     * characters.
+     * <p>
+     * String having negative length value is considered a null string.
+     *
+     * @param tupleInput
+     *            tuple input
+     * @return AMQShortString
+     */
+    @Override
+    public AMQShortString entryToObject(TupleInput tupleInput)
+    {
+        short length = tupleInput.readShort();
+        if (length < 0)
+        {
+            return null;
+        }
+        else
+        {
+            byte[] stringBytes = new byte[length];
+            tupleInput.readFast(stringBytes);
+            return new AMQShortString(stringBytes);
+        }
+    }
+
+    /**
+     * Writes given short string object into given tuple output.
+     * <p>
+     * The string is written as sequence of bytes where first 2 bytes contain a
+     * string length and followed by string characters.
+     *
+     * @param shortString
+     *            short string
+     * @param tupleOutput
+     *            tuple output
+     */
+    @Override
+    public void objectToEntry(AMQShortString shortString, TupleOutput tupleOutput)
+    {
+        if (shortString == null)
+        {
+            tupleOutput.writeShort(-1);
+        }
+        else
+        {
+            tupleOutput.writeShort(shortString.length());
+            tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
+        }
+    }
+
+}

Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+/**
+ * A factory class to construct {@link TupleBinding<AMQShortString>} instance to
+ * use for writing/reading {@link AMQShortString} objects into/from BDB tuple.
+ */
+public class AMQShortStringTupleBindingFactory extends TupleBindingFactory
+{
+
+    /**
+     * Constructs factory for given BDB store version and virtual host
+     *
+     * @param version
+     *            BDB store version
+     * @param virtualhost
+     *            virtual host
+     */
+    public AMQShortStringTupleBindingFactory(int version, VirtualHost virtualhost)
+    {
+        super(version, virtualhost);
+    }
+
+    /**
+     * Creates {@link TupleBinding<AMQShortString>} instance for given BDB
+     * version.
+     *
+     * @param version
+     *            BDB version.
+     * @return TupleBinding<AMQShortString>
+     */
+    public static TupleBinding<AMQShortString> createTupleBinding(int version)
+    {
+        switch (version)
+        {
+            default:
+            case 4:
+                return new AMQShortStringTB_4();
+            case 1:
+            case 2:
+                // old BDB store short string binding
+                return new AMQShortStringTB_2();
+        }
+    }
+
+    @Override
+    public TupleBinding<AMQShortString> getInstance()
+    {
+        return AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+    }
+}

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
-import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import com.sleepycat.bind.tuple.TupleBinding;
+
 public class BindingTupleBindingFactory extends TupleBindingFactory
 {
-    public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+    /**
+     * Holds tuple binding to serialize/de-serialize AMQShortString objects
+     */
+    protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    /**
+     * Creates a factory instance for given store version, virtual host and
+     * {@link AMQShortString} tuple binding
+     *
+     * @param version
+     *            store version
+     * @param virtualHost
+     *            virtual host
+     * @param shortStringTupleBinding
+     *            AMQShortString tuple binding
+     */
+    public BindingTupleBindingFactory(int version, VirtualHost virtualhost,
+            TupleBinding<AMQShortString> shortStringTupleBinding)
     {
         super(version, virtualhost);
+        _shortStringTupleBinding = shortStringTupleBinding;
     }
 
-    public TupleBinding getInstance()
+    public TupleBinding<BindingKey> getInstance()
     {
         switch (_version)
         {
             default:
+            case 4:
             case 2:
-                return new BindingTuple_2(_virtualhost);
+                return new BindingTuple_2(_virtualhost, _shortStringTupleBinding);
             case 1:
-                return new BindingTuple_1(_virtualhost);
+                return new BindingTuple_1(_virtualhost, _shortStringTupleBinding);
         }
     }
 }

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -1,8 +1,6 @@
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.server.store.berkeleydb.BindingKey;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -12,46 +10,60 @@
 import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.log4j.Logger;
 
-public class BindingTuple_1 extends TupleBinding implements BindingTuple
+public class BindingTuple_1 extends TupleBinding<BindingKey> implements BindingTuple
 {
     protected static final Logger _log = Logger.getLogger(BindingTuple.class);
 
     protected VirtualHost _virtualhost;
 
-    public BindingTuple_1(VirtualHost virtualHost)
+    /**
+     * Holds tuple binding to serialize AMQShortString objects
+     */
+    protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    /**
+     * Constructs instance for given virtual host and AMQShortString {@link TupleBinding}.
+     *
+     * @param virtualHost virtual host
+     * @param shortStringTupleBinding  AMQShortString tuple binding
+     */
+    public BindingTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
     {
         if (virtualHost == null)
         {
             throw new NullPointerException("Virtualhost cannot be null");
         }
+        if (shortStringTupleBinding == null)
+        {
+            throw new NullPointerException("AMQShortString tuple binding cannot be null");
+        }
         _virtualhost = virtualHost;
+        _shortStringTupleBinding = shortStringTupleBinding;
     }
 
-    public Object entryToObject(TupleInput tupleInput)
+    public BindingKey entryToObject(TupleInput tupleInput)
     {
-        AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString exchangeName = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
 
         return createNewBindingKey(exchangeName, queueName, routingKey);
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
     {
-        BindingKey binding = (BindingKey) object;
-
-        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
     }
 
-    private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
+    private BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
     {
         return createNewBindingKey(exchangeName, queueName, routingKey, null);
     }
 
     // Addition for Version 2 of this table
-    protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
+    protected BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
                                          AMQShortString routingKey, FieldTable arguments)
     {
         return new BindingKey(exchangeName, queueName, routingKey, arguments);

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -2,27 +2,27 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.store.berkeleydb.BindingKey;
 import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
 public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
 {
 
-    public BindingTuple_2(VirtualHost virtualHost)
+    public BindingTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
     {
-        super(virtualHost);
+        super(virtualHost, shortStringTupleBinding);
     }
 
-    public Object entryToObject(TupleInput tupleInput)
+    public BindingKey entryToObject(TupleInput tupleInput)
     {
-        AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString exchangeName = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
 
         FieldTable arguments;
 
@@ -40,13 +40,11 @@
         return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
     {
-        BindingKey binding = (BindingKey) object;
-        
-        AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
 
         // Addition for Version 2 of this table
         FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);

Added: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * TupleBinding implementation to store/load {@link QueueEntryKey} in/from DBD store.
+ */
+public class QueueEntryKeyTupleBinding extends TupleBinding<QueueEntryKey>
+{
+    /**
+     * Holds tuple binding to serialize AMQShortString objects
+     */
+    private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    /**
+     * Constructs instance for given short string tuple binding
+     *
+     * @param shortStringTupleBinding
+     *            short string tuple binding
+     */
+    public QueueEntryKeyTupleBinding(TupleBinding<AMQShortString> shortStringTupleBinding)
+    {
+        _shortStringTupleBinding = shortStringTupleBinding;
+    }
+
+    @Override
+    public QueueEntryKey entryToObject(TupleInput tupleInput)
+    {
+        final QueueEntryKey mk = new QueueEntryKey();
+        mk.queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+        mk.messageId = tupleInput.readLong();
+        return mk;
+    }
+
+    @Override
+    public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+    {
+        _shortStringTupleBinding.objectToEntry(mk.queueName, tupleOutput);
+        tupleOutput.writeLong(mk.messageId);
+    }
+
+}
\ No newline at end of file

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuples;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+
 import com.sleepycat.bind.tuple.TupleBinding;
 
 public class QueueTupleBindingFactory extends TupleBindingFactory
 {
-    public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+    /**
+     * Holds tuple binding to serialize/de-serialize AMQShortString objects
+     */
+    protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    /**
+     * Creates a factory instance for given store version, virtual host and
+     * {@link AMQShortString} tuple binding
+     *
+     * @param version
+     *            store version
+     * @param virtualHost
+     *            virtual host
+     * @param shortStringTupleBinding
+     *            AMQShortString tuple binding
+     */
+    public QueueTupleBindingFactory(int version, VirtualHost virtualHost,
+            TupleBinding<AMQShortString> shortStringTupleBinding)
     {
-        super(version,virtualHost);
+        super(version, virtualHost);
+        _shortStringTupleBinding = shortStringTupleBinding;
     }
 
-    public TupleBinding getInstance()
+    public TupleBinding<AMQQueue> getInstance()
     {
         switch (_version)
         {
             default:
+            case 4:
             case 2:
-                return new QueueTuple_2(_virtualhost);
+                return new QueueTuple_2(_virtualhost, _shortStringTupleBinding);
             case 1:
-                return new QueueTuple_1(_virtualhost);
+                return new QueueTuple_1(_virtualhost, _shortStringTupleBinding);
         }
     }
 }

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -26,38 +26,44 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class QueueTuple_1 extends TupleBinding implements QueueTuple
+public class QueueTuple_1 extends TupleBinding<AMQQueue> implements QueueTuple
 {
     protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
 
     protected final VirtualHost _virtualHost;
 
-    public QueueTuple_1(VirtualHost virtualHost)
+    /**
+     * Holds tuple binding to serialize AMQShortString objects
+     */
+    protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+    public QueueTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
     {
         if (virtualHost == null)
         {
             throw new NullPointerException("Virtualhost cannot be null");
+        }if (shortStringTupleBinding == null)
+        {
+            throw new NullPointerException("AMQShortString tuple binding cannot be null");
         }
+        _shortStringTupleBinding = shortStringTupleBinding;
         _virtualHost = virtualHost;
     }
 
-    public Object entryToObject(TupleInput tupleInput)
+    public AMQQueue entryToObject(TupleInput tupleInput)
     {
-        AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-        AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+        AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+        AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
 
         return createNewQueue(name, owner);
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
     {
-        AMQQueue queue = (AMQQueue) object;
-
-        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
     }
 
     // Addition for Version 2 of this table
@@ -66,13 +72,13 @@
         //no-op
     }
 
-    protected Object createNewQueue(AMQShortString name, AMQShortString owner)
+    protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner)
     {
         return createNewQueue(name, owner, null);
     }
 
     // Addition for Version 2 of this table
-    protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
+    protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
     {
         try
         {

Modified: store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,10 +20,10 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
@@ -31,17 +31,17 @@
 {
     protected FieldTable _arguments;
 
-    public QueueTuple_2(VirtualHost virtualHost)
+    public QueueTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString> shortStringTupleBinding)
     {
-        super(virtualHost);
+        super(virtualHost, shortStringTupleBinding);
     }
 
-    public Object entryToObject(TupleInput tupleInput)
+    public AMQQueue entryToObject(TupleInput tupleInput)
     {
         try
         {
-            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
-            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+            AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
             // Addition for Version 2 of this table
             FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
 
@@ -55,12 +55,10 @@
 
     }
 
-    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
     {
-        AMQQueue queue = (AMQQueue) object;
-
-        AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
-        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+        _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
         // Addition for Version 2 of this table
         FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
     }

Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -19,18 +19,14 @@
 
 import com.sleepycat.je.DatabaseException;
 import junit.framework.Assert;
-import junit.framework.TestCase;
 import junit.framework.TestSuite;
-import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
 import org.apache.qpid.server.exchange.DirectExchange;
@@ -42,18 +38,14 @@
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.io.File;
-import java.util.LinkedList;
 import java.util.List;
 
 public class BDBStoreTest extends BDBVMTestCase
 {
-    private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
 
     private BDBMessageStore _store;
     private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
@@ -61,7 +53,6 @@
     private StoreContext _storeContext = new StoreContext();
     private VirtualHost _virtualHost;
 
-    private TransactionalContext _txnContext;
     private static final AMQShortString QUEUE1 = new AMQShortString("queue1");
     private static final AMQShortString ME = new AMQShortString("me");
     private static final AMQShortString MYEXCHANGE = new AMQShortString("myexchange");
@@ -70,7 +61,6 @@
     private static final AMQShortString HIM = new AMQShortString("him");
     private static final AMQShortString EXCHANGE1 = new AMQShortString("exchange1");
 
-    private static volatile int _loops;
     File BDB_DIR = new File(STORE_LOCATION);
 
     public void setUp() throws Exception
@@ -101,7 +91,6 @@
 
         _store.setVirtualHost(_virtualHost);
 
-        _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
     private void reload() throws Exception
@@ -338,6 +327,9 @@
 
     public void testTranCommit() throws Exception
     {
+        List<Long> previousIds = _store.getEnqueuedMessages(QUEUE1);
+        assertTrue("Last Test Messages are still present", previousIds.isEmpty());
+
         MessagePublishInfo pubBody = createPublishBody();
         BasicContentHeaderProperties props = createContentHeaderProperties();
         String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -363,8 +355,30 @@
         val = enqueuedIds.get(1);
         Assert.assertEquals("Second Message is incorrect", 21L, val.longValue());
 
+        cleanQueue(queue, enqueuedIds);
+
     }
 
+    /**
+     * A helper method to clean given queue form messages with given IDs.
+     *
+     * @param queue
+     *            queue to clean
+     * @param enqueuedIds
+     *            messages to dequeue
+     * @throws AMQException
+     */
+    protected void cleanQueue(AMQQueue queue, List<Long> enqueuedIds) throws AMQException
+    {
+        // clean queue
+        _store.beginTran(_storeContext);
+        for (Long messageId : enqueuedIds)
+        {
+            _store.dequeueMessage(_storeContext, queue, messageId);
+        }
+        _store.commitTran(_storeContext);
+    }
+
     public void testTranRollback1() throws Exception
     {
         List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -404,6 +418,8 @@
         val = enqueuedIds.get(1);
         assertEquals("Second Message is incorrect", 31L, val.longValue());
 
+        cleanQueue(queue, enqueuedIds);
+
     }
 
     public void testTranRollback2() throws Exception
@@ -440,6 +456,8 @@
         Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
         val = enqueuedIds.get(1);
         Assert.assertEquals("Second Message is incorrect", 32L, val.longValue());
+
+        cleanQueue(queue, enqueuedIds);
     }
 
     public void testRecovery() throws Exception

Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
@@ -48,16 +51,15 @@
     final String BDBHome = System.getProperty("BDB_HOME");
     final File _configFile = new File(BDBHome, "etc/config.xml");
 
-    private String VIRTUALHOST = "test";
-
     private static final String VERSION_1 = "1";
-    private static final String VERSION_2 = "2";
+    private static final String VERSION_4 = "4";
 
     private String _topic = "MyDurableSubscriptionTestTopic";
 
     String _fromDir = System.getProperty("QPID_WORK") + "/version1Store";
-    String _toDir = System.getProperty("QPID_WORK") + "/version2Store";
-    String _toDirTwice = System.getProperty("QPID_WORK") + "/version2StoreUpgradeTwice";
+    String _fromDir2 = System.getProperty("QPID_WORK") + "/version2Store";
+    String _toDir = System.getProperty("QPID_WORK") + "/version4Store";
+    String _toDirTwice = System.getProperty("QPID_WORK") + "/version4StoreUpgradeTwice";
 
     public void setUp() throws IOException
     {
@@ -84,44 +86,102 @@
             FileUtils.delete(directory, true);
         }
 
+        directory = new File(_fromDir2);
 
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
     }
 
-    public void testMultipleUpgrades() throws Exception
+    public void testMultipleUpgradesFromVersionOne() throws Exception
     {
+        assertBrokerUpgrade(1, _fromDir);
+    }
+
+    public void testMultipleUpgradesFromVersionTwo() throws Exception
+    {
+        assertBrokerUpgrade(2, _fromDir2);
+    }
+
+    public void testUpgradeFromLatestVersion() throws Exception
+    {
+        startBroker(1, Integer.toString(BDBMessageStore.DATABASE_FORMAT_VERSION));
+        stopBroker(1);
+        try
+        {
+            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(BDBMessageStore.DATABASE_FORMAT_VERSION);
+            fail("Upgrade Succeeded");
+        }
+        catch (Exception e)
+        {
+            assertEquals("Store on disk already upgraded to latest version " 
+                    + BDBMessageStore.DATABASE_FORMAT_VERSION, e.getMessage());
+        }
+    }
+
+    /**
+     * Tests broker upgrade from given version with store located by given path
+     *
+     * @param version
+     *            store version
+     * @param storePath
+     *            store path
+     * @throws Exception
+     */
+    protected void assertBrokerUpgrade(int version, String storePath) throws Exception
+    {
         String broker = "vm://:1";
 
-        startBroker(1, VERSION_1);
+        startBroker(1, Integer.toString(version));
 
-        //Ensure msg were transitioned to new broker
+        // Ensure msg were transitioned to new broker
         sendAndCheckDurableSubscriber(broker, true, true, 5, null);
 
-        //Reset the Selector Pattern
+        // Reset the Selector Pattern
         new DurableSubscriber(broker, _topic, "odd=true").close();
 
         stopBroker(1);
 
-        upgradeBroker();
+        upgradeBroker(storePath);
 
         try
         {
-            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(1);
+            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(version);
             fail("Second Upgrade Succeeded");
         }
         catch (Exception e)
         {
             System.err.println("Showing stack trace. expecting Unable to load BDBStore error");
             e.printStackTrace();
-            assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
-                       e.getMessage().contains("Unable to load BDBStore as version 1. Store on disk contains version 2 data"));
+            String expectedMessage = "Unable to load BDBStore as version " + version
+                    + ". Store on disk contains version 4 data";
+            assertTrue("Incorrect Exception Thrown:" + e.getMessage(), e.getMessage().contains(expectedMessage));
         }
     }
 
-    public void testDurababilitySelectors() throws Exception
+    public void testDurababilitySelectorsForUpgradeFromVersionOne() throws Exception
     {
+        assertDurababilitySelectorsForUpgrade(1, _fromDir);
+    }
+
+    public void testDurababilitySelectorsForUpgradeFromVersionTwo() throws Exception
+    {
+        assertDurababilitySelectorsForUpgrade(2, _fromDir2);
+    }
+
+    /**
+     * Tests broker upgrade for the storage containing data for durable subscription.
+     *
+     * @param version store version
+     * @param storePath store location
+     * @throws Exception
+     */
+    protected void assertDurababilitySelectorsForUpgrade(int version, String storePath) throws Exception
+    {
         String broker = "vm://:1";
 
-        startBroker(1, VERSION_1);
+        startBroker(1, Integer.toString(version));
 
         new DurableSubscriber(broker, _topic, null).close();
 
@@ -133,26 +193,16 @@
 
         stopBroker(1);
 
-        upgradeBroker();
+        upgradeBroker(storePath);
 
         broker = "vm://:2";
 
-        startBroker(2, VERSION_2);
+        startBroker(2, VERSION_4);
 
         //Ensure msg were transitioned to new broker
-        sendAndCheckDurableSubscriber(broker, false, false, 5, null);
+        sendAndCheckDurableSubscriber(broker, false, true, 5, null);
 
-        //Reset the Selector Pattern
-        new DurableSubscriber(broker, _topic, "odd=true").close();
-
         stopBroker(2);
-
-        startBroker(2, VERSION_2);
-
-        ///* This test is currently broken due to QPID-1275
-        //Ensure that the selector was preseved on restart and caused all msgs to be removed.
-        sendAndCheckDurableSubscriber(broker, false, false, 0, null);
-        stopBroker(2);
     }
 
     public void testDurabability() throws Exception
@@ -172,11 +222,31 @@
         stopBroker(1);
     }
 
-    private void upgradeBroker() throws Exception
+    public void testStoreVersionDetection() throws Exception
     {
-        new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(1);
+        int[] versions = {1, 2, 4};
+        for (int i = 0; i < versions.length; i++)
+        {
+            assertBDBStoreVersionDetection(1);
+        }
     }
 
+    
+    protected void assertBDBStoreVersionDetection(int version) throws Exception
+    {
+        startBroker(1, Integer.toString(version));
+        stopBroker(1);
+
+        File storeFolder = new File(System.getProperty("QPID_WORK") + "/version" + version + "Store");
+        int detectedVersion = BDBStoreUpgrade.getStoreVersion(storeFolder);
+        assertEquals("failed to detect store version for store " + version, version, detectedVersion);
+    }
+
+    private void upgradeBroker(String fromDir) throws Exception
+    {
+         BDBStoreUpgrade.upgrade(new File(fromDir), _toDir, null, false, true);
+    }
+
     private void stopBroker(int port)
     {
         TransportConnection.killVMBroker(port);
@@ -199,6 +269,7 @@
                                     System.getProperty("QPID_WORK")+ "/version" + version + "Store");
         testVirtualhost.setProperty("store.version", version);
 
+        CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
         ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
                 registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
 

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link ExchangeTB}
+ */
+public class ExchangeTBTest extends TestCase
+{
+    // tested tuple binding
+    private ExchangeTB _exchangeTupleBinding;
+
+    // test exchange
+    private Exchange _exchange;
+
+    public void setUp() throws Exception
+    {
+        CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+        Configuration config = new PropertiesConfiguration();
+        VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+        VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+        ExchangeTB exchangeTupleBinding = new ExchangeTB(virtualHost,
+                AMQShortStringTupleBindingFactory.createTupleBinding(4));
+        _exchangeTupleBinding = exchangeTupleBinding;
+        _exchange = new DirectExchange();
+        _exchange.initialise(virtualHost, new AMQShortString("test echange"), true, 0, true);
+    }
+
+    /**
+     * Tests {@link Exchange} object serialization/de-serialization with {@link ExchangeTB}.
+     */
+    public void testObjectToEntryConversion()
+    {
+        // write into to tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        _exchangeTupleBinding.objectToEntry(_exchange, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        Exchange storedExchange = _exchangeTupleBinding.entryToObject(tupleInput);
+
+        assertNotNull(storedExchange);
+
+        assertEquals(_exchange.getName(), storedExchange.getName());
+        assertEquals(_exchange.getType(), storedExchange.getType());
+        assertEquals(_exchange.isAutoDelete(), storedExchange.isAutoDelete());
+    }
+
+}

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for {@link MessageMetaDataTB}
+ */
+public class MessageMetaDataTBTest  extends TestCase
+{
+    // tested tuple binding
+    private MessageMetaDataTB _messageMetaDataTupleBinding;
+
+    // tested object
+    private MessageMetaData _messageMetaData;
+
+    public void setUp() throws Exception
+    {
+        _messageMetaDataTupleBinding = new MessageMetaDataTB(AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+        MessagePublishInfo publishBody = new MessagePublishInfo()
+        {
+            @Override
+            public void setExchange(AMQShortString exchange)
+            {
+                // ignore
+            }
+
+            @Override
+            public boolean isMandatory()
+            {
+                return true;
+            }
+
+            @Override
+            public boolean isImmediate()
+            {
+                return true;
+            }
+
+            @Override
+            public AMQShortString getRoutingKey()
+            {
+                return new AMQShortString("test routine key");
+            }
+
+            @Override
+            public AMQShortString getExchange()
+            {
+                return new AMQShortString("test exchange");
+            }
+        };
+
+        BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+        properties.setAppId("test app");
+        properties.setClusterId("test claster");
+        properties.setContentType("text");
+        properties.setCorrelationId("test correlation id");
+        properties.setDeliveryMode((byte)1);
+        properties.setEncoding("UTF-8");
+        properties.setExpiration(2);
+        properties.setMessageId("test message id");
+        properties.setPriority((byte)3);
+        properties.setReplyTo("test reply to");
+        properties.setType("test type");
+        properties.setUserId("test user id");
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody(properties, BasicConsumeBodyImpl.CLASS_ID );
+        _messageMetaData = new MessageMetaData(publishBody, contentHeaderBody, 0);
+    }
+
+    /**
+     * Tests {@link MessageMetaData} object serialization/de-serialization with {@link MessageMetaDataTB}
+     */
+    public void testObjectToEntryConversion()
+    {
+        // write into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        _messageMetaDataTupleBinding.objectToEntry(_messageMetaData, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        MessageMetaData storedMessageMetaData = _messageMetaDataTupleBinding.entryToObject(tupleInput);
+
+        assertNotNull(storedMessageMetaData);
+
+        MessagePublishInfo expectedInfo = _messageMetaData.getMessagePublishInfo();
+        MessagePublishInfo storedInfo = storedMessageMetaData.getMessagePublishInfo();
+
+        assertEquals(expectedInfo.getExchange(), storedInfo.getExchange());
+        assertEquals(expectedInfo.getRoutingKey(), storedInfo.getRoutingKey());
+        assertEquals(expectedInfo.isImmediate(), storedInfo.isImmediate());
+        assertEquals(expectedInfo.isMandatory(), storedInfo.isMandatory());
+
+        ContentHeaderBody expectedBody = _messageMetaData.getContentHeaderBody();
+        ContentHeaderBody storedBody = storedMessageMetaData.getContentHeaderBody();
+
+        assertEquals(expectedBody.classId, storedBody.classId);
+        assertEquals(expectedBody.weight, storedBody.weight);
+        assertEquals(expectedBody.bodySize, storedBody.bodySize);
+
+        BasicContentHeaderProperties expectedBodyProperties = (BasicContentHeaderProperties)expectedBody.getProperties();
+        BasicContentHeaderProperties storedBodyProperties = (BasicContentHeaderProperties)storedBody.getProperties();
+
+        assertEquals(expectedBodyProperties.getPropertyFlags(), storedBodyProperties.getPropertyFlags());
+        assertEquals(expectedBodyProperties.getAppIdAsString(), storedBodyProperties.getAppIdAsString());
+        assertEquals(expectedBodyProperties.getClusterIdAsString(), storedBodyProperties.getClusterIdAsString());
+        assertEquals(expectedBodyProperties.getContentTypeAsString(), storedBodyProperties.getContentTypeAsString());
+        assertEquals(expectedBodyProperties.getCorrelationIdAsString(), storedBodyProperties.getCorrelationIdAsString());
+        assertEquals(expectedBodyProperties.getEncodingAsString(), storedBodyProperties.getEncodingAsString());
+        assertEquals(expectedBodyProperties.getDeliveryMode(), storedBodyProperties.getDeliveryMode());
+        assertEquals(expectedBodyProperties.getExpiration(), storedBodyProperties.getExpiration());
+        assertEquals(expectedBodyProperties.getMessageIdAsString(), storedBodyProperties.getMessageIdAsString());
+        assertEquals(expectedBodyProperties.getPriority(), storedBodyProperties.getPriority());
+        assertEquals(expectedBodyProperties.getReplyToAsString(), storedBodyProperties.getReplyToAsString());
+        assertEquals(expectedBodyProperties.getTypeAsString(), storedBodyProperties.getTypeAsString());
+        assertEquals(expectedBodyProperties.getUserIdAsString(), storedBodyProperties.getUserIdAsString());
+        assertEquals(expectedBodyProperties.getTimestamp(), storedBodyProperties.getTimestamp());
+    }
+
+}

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for AMQQueue tuple binding
+ */
+public class AMQQueueTupleBindingTest extends TestCase
+{
+    // test queue tuple binding factory
+    private QueueTupleBindingFactory _factory;
+
+    // test queue
+    private AMQQueue _testQueue;
+
+    public void setUp() throws Exception
+    {
+        CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+        Configuration config = new PropertiesConfiguration();
+        VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+        VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+        _factory = new QueueTupleBindingFactory(4, virtualHost,
+                AMQShortStringTupleBindingFactory.createTupleBinding(4));
+        _testQueue =  AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), true,
+                new AMQShortString("tmp"), true, virtualHost, null);
+    }
+
+    /**
+     * Tests {@link Exchange} object serialization/de-serialization with {@link ExchangeTB}.
+     */
+    public void testObjectToEntryConversion()
+    {
+        // write into to tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        TupleBinding<AMQQueue> queueTupleBinding = _factory.getInstance();
+
+        queueTupleBinding.objectToEntry(_testQueue, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        AMQQueue storedQueue = queueTupleBinding.entryToObject(tupleInput);
+
+        assertNotNull(storedQueue);
+
+        assertEquals(_testQueue.getName(), storedQueue.getName());
+        assertEquals(_testQueue.getOwner(), storedQueue.getOwner());
+    }
+}

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTB_4;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests encoding/decoding of AMQP short strings
+ */
+public class AMQShortStringTupleBindingTest extends TestCase
+{
+    /**
+     * Tests write and read operations for AMQP short string with length greater
+     * then 127.
+     */
+    public void testObjectToEntryForShortStringWithLengthGreater127()
+    {
+        AMQShortString testString = new AMQShortString(generateTestString(128));
+        assertObjectToEntryConversion(testString);
+    }
+
+    /**
+     * Tests write and read operations for null AMQP short string.
+     */
+    public void testObjectToEntryForNullShortString()
+    {
+        AMQShortString testString = null;
+        assertObjectToEntryConversion(testString);
+    }
+
+    /**
+     * Tests write and read operations for AMQP short string with length 255
+     * characters.
+     */
+    public void testObjectToEntryForShortStringWithLength255()
+    {
+        AMQShortString testString = new AMQShortString(generateTestString(255));
+        assertObjectToEntryConversion(testString);
+    }
+
+    /**
+     * Tests write and read operations for empty AMQP short string.
+     */
+    public void testObjectToEntryForEmptyShortString()
+    {
+        AMQShortString testString = new AMQShortString("");
+        assertObjectToEntryConversion(testString);
+    }
+
+    /**
+     * Tests
+     * {@link AMQShortStringEncoding#writeShortString(AMQShortString, TupleOutput)}
+     * and {@link AMQShortStringEncoding#readShortString(TupleInput)} to write
+     * and read AMQP short strings.
+     */
+    protected void assertObjectToEntryConversion(AMQShortString testString)
+    {
+        AMQShortStringTB_4 tupleBinding = new AMQShortStringTB_4();
+        // write string into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        tupleBinding.objectToEntry(testString, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read string from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        AMQShortString restoredString = tupleBinding.entryToObject(tupleInput);
+
+        // assert read string against original string
+        assertEquals(testString, restoredString);
+    }
+
+    /**
+     * A helper method to generate a test string of given size
+     *
+     * @param length
+     *            string length
+     * @return string
+     */
+    private String generateTestString(int length)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++)
+        {
+            sb.append('a');
+        }
+        return sb.toString();
+    }
+}

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests BindingKey tuple serialization/de-serialization.
+ *
+ */
+public class BindingKeyTupleBindingTest extends TestCase
+{
+    // test queue tuple binding factory
+    private BindingTupleBindingFactory _factory;
+
+    public void setUp() throws Exception
+    {
+        CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+        Configuration config = new PropertiesConfiguration();
+        VirtualHostConfiguration hostConfiguration = new VirtualHostConfiguration("junit", config);
+        VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+        _factory = new BindingTupleBindingFactory(4, virtualHost,
+                AMQShortStringTupleBindingFactory.createTupleBinding(4));
+    }
+
+    /**
+     * Tests {@link BindingKey} object serialization/de-serialization.
+     */
+    public void testObjectToEntryConversion()
+    {
+        BindingKey key = new BindingKey(new AMQShortString("test1"), new AMQShortString("test2"),
+                new AMQShortString("test3"), null);
+
+        // write into to tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        TupleBinding<BindingKey> keyTupleBinding = _factory.getInstance();
+
+        keyTupleBinding.objectToEntry(key, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        BindingKey storedKey = keyTupleBinding.entryToObject(tupleInput);
+
+        assertNotNull(storedKey);
+
+        assertEquals(key.getExchangeName(), storedKey.getExchangeName());
+        assertEquals(key.getQueueName(), storedKey.getQueueName());
+        assertEquals(key.getRoutingKey(), storedKey.getRoutingKey());
+    }
+}

Added: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java	                        (rev 0)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link QueueEntryKeyTupleBinding}
+ */
+public class QueueEntryKeyTupleBindingTest extends TestCase
+{
+    /**
+     * Tests {@link QueueEntryKey} object serialization/de-serialization with
+     * {@link QueueEntryKeyTupleBinding}
+     */
+    public void testObjectToEntryConversion()
+    {
+        QueueEntryKey queueEntryKey = new QueueEntryKey(new AMQShortString("test queue"), 2);
+        QueueEntryKeyTupleBinding queueEntryKeyTupleBinding = new QueueEntryKeyTupleBinding(
+                AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+        // write into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        queueEntryKeyTupleBinding.objectToEntry(queueEntryKey, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        QueueEntryKey storedQueueEntryKey = queueEntryKeyTupleBinding.entryToObject(tupleInput);
+
+        assertNotNull(storedQueueEntryKey);
+
+        assertEquals(queueEntryKey.messageId, storedQueueEntryKey.messageId);
+        assertEquals(queueEntryKey.queueName, storedQueueEntryKey.queueName);
+    }
+
+}

Modified: store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -37,7 +37,7 @@
 
     public JNDIHelper(String broker) 
     {
-        CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
+        CONNECTION_NAME = "amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker + "'";
         setupJNDI();
     }
 

Modified: store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java	2011-05-04 09:13:25 UTC (rev 4456)
@@ -22,11 +22,15 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.NullRootMessageLogger;
@@ -51,6 +55,9 @@
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.bind.EntryBinding;
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.ByteBinding;
 
@@ -95,7 +102,8 @@
     private boolean _interactive;
     private boolean _force;
 
-    private static final String VERSION = "1.0";
+    private static final String VERSION = "2.0";
+    private static final String USER_ABORTED_PROCESS = "User aborted process";
     private static final String OPTION_INPUT_SHORT = "i";
     private static final String OPTION_INPUT = "input";
     private static final String OPTION_OUTPUT_SHORT = "o";
@@ -244,7 +252,7 @@
             {
                 if (input.equalsIgnoreCase(ANSWER_A) || input.equalsIgnoreCase(ANSWER_ABORT))
                 {
-                    throw new RuntimeException("User aborted process");
+                    throw new RuntimeException(USER_ABORTED_PROCESS);
                 }
             }
         }
@@ -284,6 +292,11 @@
                                    File backupDir, boolean force,
                                    boolean inplace) throws Exception
     {
+        if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
+        {
+            throw new IllegalArgumentException("Store on disk already upgraded to latest version " 
+                    + BDBMessageStore.DATABASE_FORMAT_VERSION);
+        }
         _logger.info("Located store to upgrade at '" + fromDir + "'");
 
         // Verify user has created a backup, giving option to perform backup
@@ -311,7 +324,7 @@
                     if (!userInteract("Are you sure wish to proceed with DB migration without backup? " +
                                       "(For more details of the consequences check the Qpid/BDB Message Store Wiki)."))
                     {
-                        throw new IllegalArgumentException("Upgrade stopped as user request as no DB Backup performed.");
+                        throw new IllegalArgumentException("Upgrade stopped at user request as no DB Backup performed.");
                     }
                 }
             }
@@ -350,17 +363,22 @@
 
         try
         {
-            //Load the old MessageStore
+            int upgradeVersion = 2;
             switch (version)
             {
                 default:
+                case 2:
+                    upgradeVersion = 2;
+                    break;
                 case 1:
-                    _oldMessageStore = new BDBMessageStore(1);
-                    _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
-                    _oldMessageStore.start();
-                    upgradeFromVersion_1();
+                    upgradeVersion = 1;
                     break;
             }
+            //Load the old MessageStore
+            _oldMessageStore = new BDBMessageStore(upgradeVersion);
+            _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+            _oldMessageStore.start();
+            upgrade(upgradeVersion);
         }
         finally
         {
@@ -394,15 +412,34 @@
         }
     }
 
-    private void upgradeFromVersion_1() throws AMQException, DatabaseException
+    private void upgrade(int version) throws AMQException, DatabaseException
     {
 
-        _logger.info("Starting store upgrade from version 1");
+        _logger.info("Starting store upgrade from version " + version);
 
         _logger.info("Message Metadata");
         //Migrate _messageMetaDataDb;
-        moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(), "Message MetaData");
+        final TupleBinding<AMQShortString> newShortStringTupleBinding = _newMessageStore.getShortStringTupleBindingFactory().getInstance();
+        final MessageMetaDataTB newMessageMetaDataTupleBinding = new MessageMetaDataTB(newShortStringTupleBinding);
+        final TupleBinding<AMQShortString> oldShortStringTupleBinding = _oldMessageStore.getShortStringTupleBindingFactory().getInstance();
+        final MessageMetaDataTB oldMessageMetaDataTupleBinding = new MessageMetaDataTB(oldShortStringTupleBinding);
+        DatabaseVisitor messageMetaDataDBVisitor = new DatabaseVisitor()
+        {
 
+            @Override
+            public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+            {
+                MessageMetaData metaData = oldMessageMetaDataTupleBinding.entryToObject(value);
+                DatabaseEntry data = new DatabaseEntry();
+                newMessageMetaDataTupleBinding.objectToEntry(metaData, data);
+                _newMessageStore.getMetaDataDb().put(null, entry, data);
+                _count++;
+            }
+        };
+        _oldMessageStore.visitMetaDataDb(messageMetaDataDBVisitor);
+
+        logCount(messageMetaDataDBVisitor.getVisitedCount(), "Message MetaData");
+
         _logger.info("Message Contents");
         //Migrate _messageContentDb;
         moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(), "Message Content");
@@ -410,6 +447,7 @@
         _logger.info("Queues");
         //Migrate _queueDb;
         //Get the oldMessageStore Tuple Binding which does the parsing
+        @SuppressWarnings({ "rawtypes" })
         final TupleBinding queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
 
         //Create a visitor that will take the queues in the oldMessageStore and add them to the newMessageStore
@@ -440,16 +478,54 @@
 
         _logger.info("Delivery Records");
         //Migrate _deliveryDb;
-        moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(), "Delivery Record");
+        final EntryBinding<QueueEntryKey> oldKeyTupleBinding = new QueueEntryKeyTupleBinding(oldShortStringTupleBinding);
+        final EntryBinding<QueueEntryKey> newKeyTupleBinding = new QueueEntryKeyTupleBinding(newShortStringTupleBinding);
+        DatabaseVisitor deliveryDBVisitor = new DatabaseVisitor()
+        {
 
+            @Override
+            public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+            {
+                QueueEntryKey keyObject = oldKeyTupleBinding.entryToObject(entry);
+                DatabaseEntry key = new DatabaseEntry();
+                newKeyTupleBinding.objectToEntry(keyObject, key);
+                _newMessageStore.getDeliveryDb().put(null, key, value);
+                _count++;
+            }
+        };
+        _oldMessageStore.visitDelivery(deliveryDBVisitor);
+
+        logCount(queueVisitor.getVisitedCount(), "Delivery Record");
+
         _logger.info("Exchanges");
         //Migrate _exchangeDb;
-        moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
 
+        // old exchange tuple binding
+        final ExchangeTB oldExchangeTupleBinding = new ExchangeTB(_oldVirtualHost, oldShortStringTupleBinding);
+        final ExchangeTB newExchangeTupleBinding = new ExchangeTB(_oldVirtualHost, newShortStringTupleBinding);
+        DatabaseVisitor echangeDBVisitor = new DatabaseVisitor()
+        {
+
+            @Override
+            public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException
+            {
+                Exchange exchange = oldExchangeTupleBinding.entryToObject(value);
+                DatabaseEntry key = new DatabaseEntry();
+                DatabaseEntry data = new DatabaseEntry();
+                newShortStringTupleBinding.objectToEntry(exchange.getName(), key);
+                newExchangeTupleBinding.objectToEntry(exchange, data);
+                _newMessageStore.getExchangesDb().put(null, key, data);
+                _count++;
+            }
+        };
+        _oldMessageStore.visitExchanges(echangeDBVisitor);
+
+        logCount(queueVisitor.getVisitedCount(), "Exchange");
+
+
         _logger.info("QueueBindings");
         //Migrate _queueBindingsDb;
-        final TupleBinding bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
-
+        final TupleBinding<BindingKey> bindingTupleBinding = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
         //Create a visitor that to read the old format queue bindings
         DatabaseVisitor queueBindings = new DatabaseVisitor()
         {
@@ -458,7 +534,7 @@
                 BindingKey queueBinding = (BindingKey) bindingTupleBinding.entryToObject(key);
 
                 //Create a new Format TupleBinding
-                TupleBinding newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
+                TupleBinding<BindingKey> newBindingTupleBinding = _newMessageStore.getBindingTupleBindingFactory().getInstance();
 
                 DatabaseEntry newKey = new DatabaseEntry();
                 newBindingTupleBinding.objectToEntry(queueBinding, newKey);
@@ -712,7 +788,7 @@
         }
         catch (RuntimeException re)
         {
-            if (!re.getMessage().equals("User aborted process"))
+            if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
             {
                 re.printStackTrace();
                 _logger.error("Upgrade Failed: " + re.getMessage());
@@ -725,6 +801,7 @@
 
     }
 
+    @SuppressWarnings("static-access")
     private static void setOptions(Options options)
     {
         Option input =
@@ -759,22 +836,22 @@
     {
 
         _logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
+        int version = getStoreVersion(fromDir);
+        if (version == 0)
+        {
+            _logger.info("Existing store version is undefined!");
+            return;
+        }
+        _logger.info("Existing store version is " + version);
         try
         {
-            new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(1);
+            new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive, force).upgradeFromVersion(version);
 
             _logger.info("Upgrade complete.");
         }
         catch (IllegalArgumentException iae)
         {
-            if (iae.getMessage().endsWith("Error: Unable to load BDBStore as version 1. Store on disk contains version 2 data."))
-            {
-                System.out.println("Store '" + fromDir + "' has already been upgraded to version 2.");
-            }
-            else
-            {
-                _logger.error("Upgrade not started due to: " + iae.getMessage());
-            }
+            _logger.error("Upgrade not started due to: " + iae.getMessage());
         }
         catch (DatabaseException de)
         {
@@ -783,7 +860,7 @@
         }
         catch (RuntimeException re)
         {
-            if (!re.getMessage().equals("User aborted process"))
+            if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
             {
                 re.printStackTrace();
                 _logger.error("Upgrade Failed: " + re.getMessage());
@@ -800,6 +877,64 @@
         }
     }
 
+    /**
+     * Detects existing store version by checking list of database in store
+     * environment
+     *
+     * @param fromDir
+     *            store folder
+     * @return version
+     */
+    public static int getStoreVersion(File fromDir)
+    {
+        int version = 0;
+        EnvironmentConfig envConfig = new EnvironmentConfig();
+        envConfig.setAllowCreate(false);
+        envConfig.setTransactional(false);
+        envConfig.setReadOnly(true);
+        Environment environment = null;
+        try
+        {
+
+            environment = new Environment(fromDir, envConfig);
+            List<String> databases = environment.getDatabaseNames();
+            for (String name : databases)
+            {
+                if (name.startsWith("exchangeDb"))
+                {
+                    if (name.startsWith("exchangeDb_v"))
+                    {
+                        version = Integer.parseInt(name.substring(12));
+                    }
+                    else
+                    {
+                        version = 1;
+                    }
+                    break;
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.error("Failure to open existing database: " + e.getMessage());
+        }
+        finally
+        {
+            if (environment != null)
+            {
+                try
+                {
+                    environment.close();
+                }
+                catch (Exception e)
+                {
+                    // ignoring. It should never happen.
+                }
+            }
+        }
+        return version;
+    }
+
     private static void fatalError(String message)
     {
         System.out.println(message);



More information about the rhmessaging-commits mailing list