Author: rgemmell
Date: 2011-05-04 05:13:25 -0400 (Wed, 04 May 2011)
New Revision: 4456
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
Removed:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
Modified:
store/branches/java/0.5.x-dev/etc/config.xml
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
A signed byte was previously used to store AMQShortString lengths, which resulted in
lengths greater than 127 being converted into a negative value upon recovery and assumed
to be null. Update the store implementation to use a short for the length to allow
encoding the 255 legal values and null.
Applied patch from Oleksandr Rudyy
Modified: store/branches/java/0.5.x-dev/etc/config.xml
===================================================================
--- store/branches/java/0.5.x-dev/etc/config.xml 2011-05-03 18:49:45 UTC (rev 4455)
+++ store/branches/java/0.5.x-dev/etc/config.xml 2011-05-04 09:13:25 UTC (rev 4456)
@@ -45,7 +45,9 @@
<management>
<enabled>true</enabled>
<jmxport>8999</jmxport>
- <security-enabled>false</security-enabled>
+ <ssl>
+ <enabled>false</enabled>
+ </ssl>
</management>
<advanced>
<filterchain enableExecutorPool="true"/>
@@ -124,7 +126,6 @@
<auto_register>true</auto_register>
</queue>
- <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
Deleted:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -1,46 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 13:56:56
- * To change this template use File | Settings | File Templates.
- */
-public class AMQShortStringEncoding
-{
- public static AMQShortString readShortString(TupleInput tupleInput)
- {
- int length = (int) tupleInput.readByte();
- if (length < 0)
- {
- return null;
- }
- else
- {
- byte[] stringBytes = new byte[length];
- tupleInput.readFast(stringBytes);
- return new AMQShortString(stringBytes);
- }
-
- }
-
- public static void writeShortString(AMQShortString shortString, TupleOutput
tupleOutput)
- {
-
- if (shortString == null)
- {
- tupleOutput.writeByte(-1);
- }
- else
- {
- tupleOutput.writeByte(shortString.length());
- tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
- }
- }
-}
Deleted:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -1,28 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
-
-
- public AMQShortStringTB()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- return AMQShortStringEncoding.readShortString(tupleInput);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
- }
-
-}
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -53,7 +53,9 @@
import org.apache.qpid.server.store.AbstractMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -88,7 +90,7 @@
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
- private static final int DATABASE_FORMAT_VERSION = 2;
+ public static final int DATABASE_FORMAT_VERSION = 4;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -138,6 +140,13 @@
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
+ // factory for AMQShortStringTupleBinding
+ private AMQShortStringTupleBindingFactory _shortStringTupleBindingFactory;
+
+ // AMQShortString tuple binding object is stateless and can be reused across
+ // different methods
+ private TupleBinding<AMQShortString> _shortStringTupleBinding;
+
Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString,
Integer>();
/** The data version this store should run with */
@@ -233,6 +242,9 @@
setDatabaseNames(_version);
+ // create AMQShortString tuple binding for database version
+ _shortStringTupleBinding =
AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+
if (virtualHost != null)
{
setVirtualHost(virtualHost);
@@ -328,8 +340,9 @@
private void createTupleBindingFactories(int version)
{
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version,
_virtualHost);
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost,
_shortStringTupleBinding);
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version,
_virtualHost, _shortStringTupleBinding);
+ _shortStringTupleBindingFactory = new AMQShortStringTupleBindingFactory(version,
_virtualHost);
}
private synchronized void stateTransition(State requiredState, State newState) throws
AMQException
@@ -616,10 +629,10 @@
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
+ TupleBinding<Exchange> exchangeBinding = new ExchangeTB(_virtualHost,
_shortStringTupleBinding);
exchangeBinding.objectToEntry(exchange, value);
try
{
@@ -643,8 +656,8 @@
public void removeExchange(Exchange exchange) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), key);
try
{
OperationStatus status = _exchangeDb.delete(null, key);
@@ -747,7 +760,7 @@
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB(_virtualHost);
+ TupleBinding<Exchange> binding = new ExchangeTB(_virtualHost,
_shortStringTupleBinding);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -863,9 +876,9 @@
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getName(), key);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), key);
+
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
@@ -899,8 +912,8 @@
Long queueId = _queueNameToIdMap.remove(name);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
+
+ _shortStringTupleBinding.objectToEntry(name, key);
try
{
OperationStatus status = _queueDb.delete(null, key);
@@ -927,8 +940,8 @@
AMQQueue getQueue(AMQShortString name) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
+
+ _shortStringTupleBinding.objectToEntry(name, key);
DatabaseEntry value = new DatabaseEntry();
try
{
@@ -960,7 +973,7 @@
AMQShortString name = queue.getName();
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new
QueueEntryKeyTupleBinding(_shortStringTupleBinding);
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1020,7 +1033,7 @@
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new
QueueEntryKeyTupleBinding(_shortStringTupleBinding);
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
@@ -1208,7 +1221,7 @@
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new
QueueEntryKeyTupleBinding(_shortStringTupleBinding);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1368,7 +1381,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding<MessageMetaData> messageBinding = new
MessageMetaDataTB(_shortStringTupleBinding);
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1406,7 +1419,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding<MessageMetaData> messageBinding = new
MessageMetaDataTB(_shortStringTupleBinding);
try
{
@@ -1521,6 +1534,11 @@
return _bindingTupleBindingFactory;
}
+ public AMQShortStringTupleBindingFactory getShortStringTupleBindingFactory()
+ {
+ return _shortStringTupleBindingFactory;
+ }
+
//Package getters for the various databases used by the Store
Database getMetaDataDb()
@@ -1647,7 +1665,7 @@
Transaction tx = (Transaction) context.getPayload();
cursor = _deliveryDb.openCursor(tx, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding<QueueEntryKey> keyBinding = new
QueueEntryKeyTupleBinding(_shortStringTupleBinding);
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -9,22 +9,25 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class ExchangeTB extends TupleBinding
+public class ExchangeTB extends TupleBinding<Exchange>
{
private static final Logger _log = Logger.getLogger(ExchangeTB.class);
private final VirtualHost _virtualHost;
- public ExchangeTB(VirtualHost virtualHost)
+ private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ public ExchangeTB(VirtualHost virtualHost, TupleBinding<AMQShortString>
shortStringEncoder)
{
_virtualHost = virtualHost;
+ _shortStringTupleBinding = shortStringEncoder;
}
- public Object entryToObject(TupleInput tupleInput)
+ public Exchange entryToObject(TupleInput tupleInput)
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString typeName = _shortStringTupleBinding.entryToObject(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
@@ -56,13 +59,11 @@
}
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(Exchange exchange, TupleOutput tupleOutput)
{
- Exchange exchange = (Exchange) object;
+ _shortStringTupleBinding.objectToEntry(exchange.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(exchange.getType(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
-
tupleOutput.writeBoolean(exchange.isAutoDelete());
}
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -17,25 +17,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.MessageMetaData;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
/**
* Handles the mapping to and from message meta data
*/
-public class MessageMetaDataTB extends TupleBinding
+public class MessageMetaDataTB extends TupleBinding<MessageMetaData>
{
private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
- public Object entryToObject(TupleInput tupleInput)
+ private final TupleBinding<AMQShortString> _shortStringBinding;
+
+ public MessageMetaDataTB(TupleBinding<AMQShortString> shortStringBinding)
{
+ _shortStringBinding = shortStringBinding;
+ }
+
+ public MessageMetaData entryToObject(TupleInput tupleInput)
+ {
try
{
final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
@@ -51,9 +62,8 @@
}
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(MessageMetaData message, TupleOutput tupleOutput)
{
- MessageMetaData message = (MessageMetaData) object;
try
{
writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
@@ -71,8 +81,8 @@
private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
{
- final AMQShortString exchange =
AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey =
AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString exchange = _shortStringBinding.entryToObject(tupleInput);
+ final AMQShortString routingKey = _shortStringBinding.entryToObject(tupleInput);
final boolean mandatory = tupleInput.readBoolean();
final boolean immediate = tupleInput.readBoolean();
@@ -122,8 +132,8 @@
{
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(),
tupleOutput);
+ _shortStringBinding.objectToEntry(publishBody.getExchange(), tupleOutput);
+ _shortStringBinding.objectToEntry(publishBody.getRoutingKey(), tupleOutput);
tupleOutput.writeBoolean(publishBody.isMandatory());
tupleOutput.writeBoolean(publishBody.isImmediate());
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -1,7 +1,5 @@
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.framing.AMQShortString;
@@ -18,42 +16,6 @@
{
}
-
- public QueueEntryKey(byte[] payload)
- {
- final TupleInput ti = new TupleInput(payload);
-
- queueName = AMQShortStringEncoding.readShortString(ti);
-
- messageId = ti.readLong();
-
- }
-
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
- {
- public Object entryToObject(TupleInput tupleInput)
- {
- final QueueEntryKey mk = new QueueEntryKey();
-
-
- mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
- mk.messageId = tupleInput.readLong();
-
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final QueueEntryKey mk = (QueueEntryKey) object;
-
- AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
- tupleOutput.writeLong(mk.messageId);
-
- }
-
-
- }
-
public QueueEntryKey(AMQShortString queueName, long messageId)
{
this.queueName = queueName;
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_2.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * This class is responsible for reading/writing of {@link AMQShortString}
+ * from/into BDB tuple.
+ * <p>
+ * However, it contains a bug and can only read/write correctly
+ * {@link AMQShortString} objects having length less then 128.
+ * <p>
+ * For AMQShortString objects with length greater than 127 characters a read
+ * operation returns null due to writing a string length as a byte which causes
+ * converting values greater then 127 into negative signed byte value.
+ */
+public class AMQShortStringTB_2 extends TupleBinding<AMQShortString>
+{
+
+ /**
+ * Reads short string object from the given tuple input.
+ * <p>
+ * The length of the string is read from a first byte and followed by the
+ * string characters if there is any.
+ * <p>
+ * A null is returned for a string with negative length.
+ *
+ * @param tupleInput
+ * tuple input
+ * @return AMQShortString object or null if tuple contain a negative value
+ * in a first byte.
+ */
+ @Override
+ public AMQShortString entryToObject(TupleInput tupleInput)
+ {
+ int length = (int) tupleInput.readByte();
+ if (length < 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] stringBytes = new byte[length];
+ tupleInput.readFast(stringBytes);
+ return new AMQShortString(stringBytes);
+ }
+ }
+
+ /**
+ * Writes given short string object into given tuple output.
+ * <p>
+ * The string is stored as sequence bytes where first byte contains a string
+ * length and followed by string characters.
+ * <p>
+ * Only strings with length less then 128 can be written correctly with this
+ * method.
+ * <p>
+ * Length values greater then 127 are converted into negative signed byte
+ * values
+ * <p>
+ * Null is written as -1.
+ *
+ * @param object
+ * string to write
+ * @param tupleOutput
+ * output tuple to write string into.
+ */
+ @Override
+ public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
+ {
+ if (object == null)
+ {
+ tupleOutput.writeByte(-1);
+ }
+ else
+ {
+ tupleOutput.writeByte(object.length());
+ tupleOutput.writeFast(object.getBytes(), 0, object.length());
+ }
+ }
+
+}
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTB_4.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * A correct implementation of AMQShortString Tuple binding to use for
+ * reading/writing of {@link AMQShortString} objects from/into tuple.
+ *
+ */
+public class AMQShortStringTB_4 extends TupleBinding<AMQShortString>
+{
+ /**
+ * Reads an <code>AMQShortString</code> from given
<code>TupleInput</code>
+ * <p>
+ * The length of the string is read from 2 first bytes following string
+ * characters.
+ * <p>
+ * String having negative length value is considered a null string.
+ *
+ * @param tupleInput
+ * tuple input
+ * @return AMQShortString
+ */
+ @Override
+ public AMQShortString entryToObject(TupleInput tupleInput)
+ {
+ short length = tupleInput.readShort();
+ if (length < 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] stringBytes = new byte[length];
+ tupleInput.readFast(stringBytes);
+ return new AMQShortString(stringBytes);
+ }
+ }
+
+ /**
+ * Writes given short string object into given tuple output.
+ * <p>
+ * The string is written as sequence of bytes where first 2 bytes contain a
+ * string length and followed by string characters.
+ *
+ * @param shortString
+ * short string
+ * @param tupleOutput
+ * tuple output
+ */
+ @Override
+ public void objectToEntry(AMQShortString shortString, TupleOutput tupleOutput)
+ {
+ if (shortString == null)
+ {
+ tupleOutput.writeShort(-1);
+ }
+ else
+ {
+ tupleOutput.writeShort(shortString.length());
+ tupleOutput.writeFast(shortString.getBytes(), 0, shortString.length());
+ }
+ }
+
+}
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingFactory.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+/**
+ * A factory class to construct {@link TupleBinding<AMQShortString>} instance to
+ * use for writing/reading {@link AMQShortString} objects into/from BDB tuple.
+ */
+public class AMQShortStringTupleBindingFactory extends TupleBindingFactory
+{
+
+ /**
+ * Constructs factory for given BDB store version and virtual host
+ *
+ * @param version
+ * BDB store version
+ * @param virtualhost
+ * virtual host
+ */
+ public AMQShortStringTupleBindingFactory(int version, VirtualHost virtualhost)
+ {
+ super(version, virtualhost);
+ }
+
+ /**
+ * Creates {@link TupleBinding<AMQShortString>} instance for given BDB
+ * version.
+ *
+ * @param version
+ * BDB version.
+ * @return TupleBinding<AMQShortString>
+ */
+ public static TupleBinding<AMQShortString> createTupleBinding(int version)
+ {
+ switch (version)
+ {
+ default:
+ case 4:
+ return new AMQShortStringTB_4();
+ case 1:
+ case 2:
+ // old BDB store short string binding
+ return new AMQShortStringTB_2();
+ }
+ }
+
+ @Override
+ public TupleBinding<AMQShortString> getInstance()
+ {
+ return AMQShortStringTupleBindingFactory.createTupleBinding(_version);
+ }
+}
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
+
public class BindingTupleBindingFactory extends TupleBindingFactory
{
- public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+ /**
+ * Holds tuple binding to serialize/de-serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Creates a factory instance for given store version, virtual host and
+ * {@link AMQShortString} tuple binding
+ *
+ * @param version
+ * store version
+ * @param virtualHost
+ * virtual host
+ * @param shortStringTupleBinding
+ * AMQShortString tuple binding
+ */
+ public BindingTupleBindingFactory(int version, VirtualHost virtualhost,
+ TupleBinding<AMQShortString> shortStringTupleBinding)
{
super(version, virtualhost);
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public TupleBinding getInstance()
+ public TupleBinding<BindingKey> getInstance()
{
switch (_version)
{
default:
+ case 4:
case 2:
- return new BindingTuple_2(_virtualhost);
+ return new BindingTuple_2(_virtualhost, _shortStringTupleBinding);
case 1:
- return new BindingTuple_1(_virtualhost);
+ return new BindingTuple_1(_virtualhost, _shortStringTupleBinding);
}
}
}
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -1,8 +1,6 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -12,46 +10,60 @@
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
-public class BindingTuple_1 extends TupleBinding implements BindingTuple
+public class BindingTuple_1 extends TupleBinding<BindingKey> implements
BindingTuple
{
protected static final Logger _log = Logger.getLogger(BindingTuple.class);
protected VirtualHost _virtualhost;
- public BindingTuple_1(VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Constructs instance for given virtual host and AMQShortString {@link
TupleBinding}.
+ *
+ * @param virtualHost virtual host
+ * @param shortStringTupleBinding AMQShortString tuple binding
+ */
+ public BindingTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString>
shortStringTupleBinding)
{
if (virtualHost == null)
{
throw new NullPointerException("Virtualhost cannot be null");
}
+ if (shortStringTupleBinding == null)
+ {
+ throw new NullPointerException("AMQShortString tuple binding cannot be
null");
+ }
_virtualhost = virtualHost;
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public Object entryToObject(TupleInput tupleInput)
+ public BindingKey entryToObject(TupleInput tupleInput)
{
- AMQShortString exchangeName =
AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString exchangeName =
_shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
return createNewBindingKey(exchangeName, queueName, routingKey);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
{
- BindingKey binding = (BindingKey) object;
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
}
- private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName, AMQShortString routingKey)
+ private BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName, AMQShortString routingKey)
{
return createNewBindingKey(exchangeName, queueName, routingKey, null);
}
// Addition for Version 2 of this table
- protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName,
+ protected BindingKey createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName,
AMQShortString routingKey, FieldTable
arguments)
{
return new BindingKey(exchangeName, queueName, routingKey, arguments);
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -2,27 +2,27 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
{
- public BindingTuple_2(VirtualHost virtualHost)
+ public BindingTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString>
shortStringTupleBinding)
{
- super(virtualHost);
+ super(virtualHost, shortStringTupleBinding);
}
- public Object entryToObject(TupleInput tupleInput)
+ public BindingKey entryToObject(TupleInput tupleInput)
{
- AMQShortString exchangeName =
AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString exchangeName =
_shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString routingKey = _shortStringTupleBinding.entryToObject(tupleInput);
FieldTable arguments;
@@ -40,13 +40,11 @@
return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(BindingKey binding, TupleOutput tupleOutput)
{
- BindingKey binding = (BindingKey) object;
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getExchangeName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getQueueName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(binding.getRoutingKey(), tupleOutput);
// Addition for Version 2 of this table
FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
Added:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBinding.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * TupleBinding implementation to store/load {@link QueueEntryKey} in/from DBD store.
+ */
+public class QueueEntryKeyTupleBinding extends TupleBinding<QueueEntryKey>
+{
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ private final TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Constructs instance for given short string tuple binding
+ *
+ * @param shortStringTupleBinding
+ * short string tuple binding
+ */
+ public QueueEntryKeyTupleBinding(TupleBinding<AMQShortString>
shortStringTupleBinding)
+ {
+ _shortStringTupleBinding = shortStringTupleBinding;
+ }
+
+ @Override
+ public QueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ final QueueEntryKey mk = new QueueEntryKey();
+ mk.queueName = _shortStringTupleBinding.entryToObject(tupleInput);
+ mk.messageId = tupleInput.readLong();
+ return mk;
+ }
+
+ @Override
+ public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ _shortStringTupleBinding.objectToEntry(mk.queueName, tupleOutput);
+ tupleOutput.writeLong(mk.messageId);
+ }
+
+}
\ No newline at end of file
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -20,25 +20,47 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+
import com.sleepycat.bind.tuple.TupleBinding;
public class QueueTupleBindingFactory extends TupleBindingFactory
{
- public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize/de-serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ /**
+ * Creates a factory instance for given store version, virtual host and
+ * {@link AMQShortString} tuple binding
+ *
+ * @param version
+ * store version
+ * @param virtualHost
+ * virtual host
+ * @param shortStringTupleBinding
+ * AMQShortString tuple binding
+ */
+ public QueueTupleBindingFactory(int version, VirtualHost virtualHost,
+ TupleBinding<AMQShortString> shortStringTupleBinding)
{
- super(version,virtualHost);
+ super(version, virtualHost);
+ _shortStringTupleBinding = shortStringTupleBinding;
}
- public TupleBinding getInstance()
+ public TupleBinding<AMQQueue> getInstance()
{
switch (_version)
{
default:
+ case 4:
case 2:
- return new QueueTuple_2(_virtualhost);
+ return new QueueTuple_2(_virtualhost, _shortStringTupleBinding);
case 1:
- return new QueueTuple_1(_virtualhost);
+ return new QueueTuple_1(_virtualhost, _shortStringTupleBinding);
}
}
}
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -26,38 +26,44 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class QueueTuple_1 extends TupleBinding implements QueueTuple
+public class QueueTuple_1 extends TupleBinding<AMQQueue> implements QueueTuple
{
protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
protected final VirtualHost _virtualHost;
- public QueueTuple_1(VirtualHost virtualHost)
+ /**
+ * Holds tuple binding to serialize AMQShortString objects
+ */
+ protected TupleBinding<AMQShortString> _shortStringTupleBinding;
+
+ public QueueTuple_1(VirtualHost virtualHost, TupleBinding<AMQShortString>
shortStringTupleBinding)
{
if (virtualHost == null)
{
throw new NullPointerException("Virtualhost cannot be null");
+ }if (shortStringTupleBinding == null)
+ {
+ throw new NullPointerException("AMQShortString tuple binding cannot be
null");
}
+ _shortStringTupleBinding = shortStringTupleBinding;
_virtualHost = virtualHost;
}
- public Object entryToObject(TupleInput tupleInput)
+ public AMQQueue entryToObject(TupleInput tupleInput)
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
return createNewQueue(name, owner);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
-
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
}
// Addition for Version 2 of this table
@@ -66,13 +72,13 @@
//no-op
}
- protected Object createNewQueue(AMQShortString name, AMQShortString owner)
+ protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner)
{
return createNewQueue(name, owner, null);
}
// Addition for Version 2 of this table
- protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable
arguments)
+ protected AMQQueue createNewQueue(AMQShortString name, AMQShortString owner,
FieldTable arguments)
{
try
{
Modified:
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
---
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -20,10 +20,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@@ -31,17 +31,17 @@
{
protected FieldTable _arguments;
- public QueueTuple_2(VirtualHost virtualHost)
+ public QueueTuple_2(VirtualHost virtualHost, TupleBinding<AMQShortString>
shortStringTupleBinding)
{
- super(virtualHost);
+ super(virtualHost, shortStringTupleBinding);
}
- public Object entryToObject(TupleInput tupleInput)
+ public AMQQueue entryToObject(TupleInput tupleInput)
{
try
{
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString name = _shortStringTupleBinding.entryToObject(tupleInput);
+ AMQShortString owner = _shortStringTupleBinding.entryToObject(tupleInput);
// Addition for Version 2 of this table
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
@@ -55,12 +55,10 @@
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQQueue queue, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
-
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getName(), tupleOutput);
+ _shortStringTupleBinding.objectToEntry(queue.getOwner(), tupleOutput);
// Addition for Version 2 of this table
FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
}
Modified:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -19,18 +19,14 @@
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import junit.framework.TestSuite;
-import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
@@ -42,18 +38,14 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
-import java.util.LinkedList;
import java.util.List;
public class BDBStoreTest extends BDBVMTestCase
{
- private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
private BDBMessageStore _store;
private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
@@ -61,7 +53,6 @@
private StoreContext _storeContext = new StoreContext();
private VirtualHost _virtualHost;
- private TransactionalContext _txnContext;
private static final AMQShortString QUEUE1 = new AMQShortString("queue1");
private static final AMQShortString ME = new AMQShortString("me");
private static final AMQShortString MYEXCHANGE = new
AMQShortString("myexchange");
@@ -70,7 +61,6 @@
private static final AMQShortString HIM = new AMQShortString("him");
private static final AMQShortString EXCHANGE1 = new
AMQShortString("exchange1");
- private static volatile int _loops;
File BDB_DIR = new File(STORE_LOCATION);
public void setUp() throws Exception
@@ -101,7 +91,6 @@
_store.setVirtualHost(_virtualHost);
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new
LinkedList<RequiredDeliveryException>());
}
private void reload() throws Exception
@@ -338,6 +327,9 @@
public void testTranCommit() throws Exception
{
+ List<Long> previousIds = _store.getEnqueuedMessages(QUEUE1);
+ assertTrue("Last Test Messages are still present",
previousIds.isEmpty());
+
MessagePublishInfo pubBody = createPublishBody();
BasicContentHeaderProperties props = createContentHeaderProperties();
String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
@@ -363,8 +355,30 @@
val = enqueuedIds.get(1);
Assert.assertEquals("Second Message is incorrect", 21L,
val.longValue());
+ cleanQueue(queue, enqueuedIds);
+
}
+ /**
+ * A helper method to clean given queue form messages with given IDs.
+ *
+ * @param queue
+ * queue to clean
+ * @param enqueuedIds
+ * messages to dequeue
+ * @throws AMQException
+ */
+ protected void cleanQueue(AMQQueue queue, List<Long> enqueuedIds) throws
AMQException
+ {
+ // clean queue
+ _store.beginTran(_storeContext);
+ for (Long messageId : enqueuedIds)
+ {
+ _store.dequeueMessage(_storeContext, queue, messageId);
+ }
+ _store.commitTran(_storeContext);
+ }
+
public void testTranRollback1() throws Exception
{
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -404,6 +418,8 @@
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 31L, val.longValue());
+ cleanQueue(queue, enqueuedIds);
+
}
public void testTranRollback2() throws Exception
@@ -440,6 +456,8 @@
Assert.assertEquals("First Message is incorrect", 31L,
val.longValue());
val = enqueuedIds.get(1);
Assert.assertEquals("Second Message is incorrect", 32L,
val.longValue());
+
+ cleanQueue(queue, enqueuedIds);
}
public void testRecovery() throws Exception
Modified:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
@@ -48,16 +51,15 @@
final String BDBHome = System.getProperty("BDB_HOME");
final File _configFile = new File(BDBHome, "etc/config.xml");
- private String VIRTUALHOST = "test";
-
private static final String VERSION_1 = "1";
- private static final String VERSION_2 = "2";
+ private static final String VERSION_4 = "4";
private String _topic = "MyDurableSubscriptionTestTopic";
String _fromDir = System.getProperty("QPID_WORK") +
"/version1Store";
- String _toDir = System.getProperty("QPID_WORK") +
"/version2Store";
- String _toDirTwice = System.getProperty("QPID_WORK") +
"/version2StoreUpgradeTwice";
+ String _fromDir2 = System.getProperty("QPID_WORK") +
"/version2Store";
+ String _toDir = System.getProperty("QPID_WORK") +
"/version4Store";
+ String _toDirTwice = System.getProperty("QPID_WORK") +
"/version4StoreUpgradeTwice";
public void setUp() throws IOException
{
@@ -84,44 +86,102 @@
FileUtils.delete(directory, true);
}
+ directory = new File(_fromDir2);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
}
- public void testMultipleUpgrades() throws Exception
+ public void testMultipleUpgradesFromVersionOne() throws Exception
{
+ assertBrokerUpgrade(1, _fromDir);
+ }
+
+ public void testMultipleUpgradesFromVersionTwo() throws Exception
+ {
+ assertBrokerUpgrade(2, _fromDir2);
+ }
+
+ public void testUpgradeFromLatestVersion() throws Exception
+ {
+ startBroker(1, Integer.toString(BDBMessageStore.DATABASE_FORMAT_VERSION));
+ stopBroker(1);
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false,
true).upgradeFromVersion(BDBMessageStore.DATABASE_FORMAT_VERSION);
+ fail("Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ assertEquals("Store on disk already upgraded to latest version "
+ + BDBMessageStore.DATABASE_FORMAT_VERSION, e.getMessage());
+ }
+ }
+
+ /**
+ * Tests broker upgrade from given version with store located by given path
+ *
+ * @param version
+ * store version
+ * @param storePath
+ * store path
+ * @throws Exception
+ */
+ protected void assertBrokerUpgrade(int version, String storePath) throws Exception
+ {
String broker = "vm://:1";
- startBroker(1, VERSION_1);
+ startBroker(1, Integer.toString(version));
- //Ensure msg were transitioned to new broker
+ // Ensure msg were transitioned to new broker
sendAndCheckDurableSubscriber(broker, true, true, 5, null);
- //Reset the Selector Pattern
+ // Reset the Selector Pattern
new DurableSubscriber(broker, _topic, "odd=true").close();
stopBroker(1);
- upgradeBroker();
+ upgradeBroker(storePath);
try
{
- new BDBStoreUpgrade(_toDir, _toDirTwice, null, false,
true).upgradeFromVersion(1);
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false,
true).upgradeFromVersion(version);
fail("Second Upgrade Succeeded");
}
catch (Exception e)
{
System.err.println("Showing stack trace. expecting Unable to load
BDBStore error");
e.printStackTrace();
- assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
- e.getMessage().contains("Unable to load BDBStore as version
1. Store on disk contains version 2 data"));
+ String expectedMessage = "Unable to load BDBStore as version " +
version
+ + ". Store on disk contains version 4 data";
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
e.getMessage().contains(expectedMessage));
}
}
- public void testDurababilitySelectors() throws Exception
+ public void testDurababilitySelectorsForUpgradeFromVersionOne() throws Exception
{
+ assertDurababilitySelectorsForUpgrade(1, _fromDir);
+ }
+
+ public void testDurababilitySelectorsForUpgradeFromVersionTwo() throws Exception
+ {
+ assertDurababilitySelectorsForUpgrade(2, _fromDir2);
+ }
+
+ /**
+ * Tests broker upgrade for the storage containing data for durable subscription.
+ *
+ * @param version store version
+ * @param storePath store location
+ * @throws Exception
+ */
+ protected void assertDurababilitySelectorsForUpgrade(int version, String storePath)
throws Exception
+ {
String broker = "vm://:1";
- startBroker(1, VERSION_1);
+ startBroker(1, Integer.toString(version));
new DurableSubscriber(broker, _topic, null).close();
@@ -133,26 +193,16 @@
stopBroker(1);
- upgradeBroker();
+ upgradeBroker(storePath);
broker = "vm://:2";
- startBroker(2, VERSION_2);
+ startBroker(2, VERSION_4);
//Ensure msg were transitioned to new broker
- sendAndCheckDurableSubscriber(broker, false, false, 5, null);
+ sendAndCheckDurableSubscriber(broker, false, true, 5, null);
- //Reset the Selector Pattern
- new DurableSubscriber(broker, _topic, "odd=true").close();
-
stopBroker(2);
-
- startBroker(2, VERSION_2);
-
- ///* This test is currently broken due to QPID-1275
- //Ensure that the selector was preseved on restart and caused all msgs to be
removed.
- sendAndCheckDurableSubscriber(broker, false, false, 0, null);
- stopBroker(2);
}
public void testDurabability() throws Exception
@@ -172,11 +222,31 @@
stopBroker(1);
}
- private void upgradeBroker() throws Exception
+ public void testStoreVersionDetection() throws Exception
{
- new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(1);
+ int[] versions = {1, 2, 4};
+ for (int i = 0; i < versions.length; i++)
+ {
+ assertBDBStoreVersionDetection(1);
+ }
}
+
+ protected void assertBDBStoreVersionDetection(int version) throws Exception
+ {
+ startBroker(1, Integer.toString(version));
+ stopBroker(1);
+
+ File storeFolder = new File(System.getProperty("QPID_WORK") +
"/version" + version + "Store");
+ int detectedVersion = BDBStoreUpgrade.getStoreVersion(storeFolder);
+ assertEquals("failed to detect store version for store " + version,
version, detectedVersion);
+ }
+
+ private void upgradeBroker(String fromDir) throws Exception
+ {
+ BDBStoreUpgrade.upgrade(new File(fromDir), _toDir, null, false, true);
+ }
+
private void stopBroker(int port)
{
TransportConnection.killVMBroker(port);
@@ -199,6 +269,7 @@
System.getProperty("QPID_WORK")+
"/version" + version + "Store");
testVirtualhost.setProperty("store.version", version);
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
registerVirtualHost(new VirtualHost(new
VirtualHostConfiguration("bdbtest",testVirtualhost)));
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/ExchangeTBTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link ExchangeTB}
+ */
+public class ExchangeTBTest extends TestCase
+{
+ // tested tuple binding
+ private ExchangeTB _exchangeTupleBinding;
+
+ // test exchange
+ private Exchange _exchange;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new
VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ ExchangeTB exchangeTupleBinding = new ExchangeTB(virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ _exchangeTupleBinding = exchangeTupleBinding;
+ _exchange = new DirectExchange();
+ _exchange.initialise(virtualHost, new AMQShortString("test echange"),
true, 0, true);
+ }
+
+ /**
+ * Tests {@link Exchange} object serialization/de-serialization with {@link
ExchangeTB}.
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ _exchangeTupleBinding.objectToEntry(_exchange, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ Exchange storedExchange = _exchangeTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedExchange);
+
+ assertEquals(_exchange.getName(), storedExchange.getName());
+ assertEquals(_exchange.getType(), storedExchange.getType());
+ assertEquals(_exchange.isAutoDelete(), storedExchange.isAutoDelete());
+ }
+
+}
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTBTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for {@link MessageMetaDataTB}
+ */
+public class MessageMetaDataTBTest extends TestCase
+{
+ // tested tuple binding
+ private MessageMetaDataTB _messageMetaDataTupleBinding;
+
+ // tested object
+ private MessageMetaData _messageMetaData;
+
+ public void setUp() throws Exception
+ {
+ _messageMetaDataTupleBinding = new
MessageMetaDataTB(AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+ @Override
+ public void setExchange(AMQShortString exchange)
+ {
+ // ignore
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return true;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("test routine key");
+ }
+
+ @Override
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("test exchange");
+ }
+ };
+
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setAppId("test app");
+ properties.setClusterId("test claster");
+ properties.setContentType("text");
+ properties.setCorrelationId("test correlation id");
+ properties.setDeliveryMode((byte)1);
+ properties.setEncoding("UTF-8");
+ properties.setExpiration(2);
+ properties.setMessageId("test message id");
+ properties.setPriority((byte)3);
+ properties.setReplyTo("test reply to");
+ properties.setType("test type");
+ properties.setUserId("test user id");
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(properties,
BasicConsumeBodyImpl.CLASS_ID );
+ _messageMetaData = new MessageMetaData(publishBody, contentHeaderBody, 0);
+ }
+
+ /**
+ * Tests {@link MessageMetaData} object serialization/de-serialization with {@link
MessageMetaDataTB}
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ _messageMetaDataTupleBinding.objectToEntry(_messageMetaData, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ MessageMetaData storedMessageMetaData =
_messageMetaDataTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedMessageMetaData);
+
+ MessagePublishInfo expectedInfo = _messageMetaData.getMessagePublishInfo();
+ MessagePublishInfo storedInfo = storedMessageMetaData.getMessagePublishInfo();
+
+ assertEquals(expectedInfo.getExchange(), storedInfo.getExchange());
+ assertEquals(expectedInfo.getRoutingKey(), storedInfo.getRoutingKey());
+ assertEquals(expectedInfo.isImmediate(), storedInfo.isImmediate());
+ assertEquals(expectedInfo.isMandatory(), storedInfo.isMandatory());
+
+ ContentHeaderBody expectedBody = _messageMetaData.getContentHeaderBody();
+ ContentHeaderBody storedBody = storedMessageMetaData.getContentHeaderBody();
+
+ assertEquals(expectedBody.classId, storedBody.classId);
+ assertEquals(expectedBody.weight, storedBody.weight);
+ assertEquals(expectedBody.bodySize, storedBody.bodySize);
+
+ BasicContentHeaderProperties expectedBodyProperties =
(BasicContentHeaderProperties)expectedBody.getProperties();
+ BasicContentHeaderProperties storedBodyProperties =
(BasicContentHeaderProperties)storedBody.getProperties();
+
+ assertEquals(expectedBodyProperties.getPropertyFlags(),
storedBodyProperties.getPropertyFlags());
+ assertEquals(expectedBodyProperties.getAppIdAsString(),
storedBodyProperties.getAppIdAsString());
+ assertEquals(expectedBodyProperties.getClusterIdAsString(),
storedBodyProperties.getClusterIdAsString());
+ assertEquals(expectedBodyProperties.getContentTypeAsString(),
storedBodyProperties.getContentTypeAsString());
+ assertEquals(expectedBodyProperties.getCorrelationIdAsString(),
storedBodyProperties.getCorrelationIdAsString());
+ assertEquals(expectedBodyProperties.getEncodingAsString(),
storedBodyProperties.getEncodingAsString());
+ assertEquals(expectedBodyProperties.getDeliveryMode(),
storedBodyProperties.getDeliveryMode());
+ assertEquals(expectedBodyProperties.getExpiration(),
storedBodyProperties.getExpiration());
+ assertEquals(expectedBodyProperties.getMessageIdAsString(),
storedBodyProperties.getMessageIdAsString());
+ assertEquals(expectedBodyProperties.getPriority(),
storedBodyProperties.getPriority());
+ assertEquals(expectedBodyProperties.getReplyToAsString(),
storedBodyProperties.getReplyToAsString());
+ assertEquals(expectedBodyProperties.getTypeAsString(),
storedBodyProperties.getTypeAsString());
+ assertEquals(expectedBodyProperties.getUserIdAsString(),
storedBodyProperties.getUserIdAsString());
+ assertEquals(expectedBodyProperties.getTimestamp(),
storedBodyProperties.getTimestamp());
+ }
+
+}
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQQueueTupleBindingTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.ExchangeTB;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests for AMQQueue tuple binding
+ */
+public class AMQQueueTupleBindingTest extends TestCase
+{
+ // test queue tuple binding factory
+ private QueueTupleBindingFactory _factory;
+
+ // test queue
+ private AMQQueue _testQueue;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new
VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ _factory = new QueueTupleBindingFactory(4, virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ _testQueue = AMQQueueFactory.createAMQQueueImpl(new
AMQShortString("test"), true,
+ new AMQShortString("tmp"), true, virtualHost, null);
+ }
+
+ /**
+ * Tests {@link Exchange} object serialization/de-serialization with {@link
ExchangeTB}.
+ */
+ public void testObjectToEntryConversion()
+ {
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ TupleBinding<AMQQueue> queueTupleBinding = _factory.getInstance();
+
+ queueTupleBinding.objectToEntry(_testQueue, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQQueue storedQueue = queueTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedQueue);
+
+ assertEquals(_testQueue.getName(), storedQueue.getName());
+ assertEquals(_testQueue.getOwner(), storedQueue.getOwner());
+ }
+}
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/AMQShortStringTupleBindingTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTB_4;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests encoding/decoding of AMQP short strings
+ */
+public class AMQShortStringTupleBindingTest extends TestCase
+{
+ /**
+ * Tests write and read operations for AMQP short string with length greater
+ * then 127.
+ */
+ public void testObjectToEntryForShortStringWithLengthGreater127()
+ {
+ AMQShortString testString = new AMQShortString(generateTestString(128));
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for null AMQP short string.
+ */
+ public void testObjectToEntryForNullShortString()
+ {
+ AMQShortString testString = null;
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for AMQP short string with length 255
+ * characters.
+ */
+ public void testObjectToEntryForShortStringWithLength255()
+ {
+ AMQShortString testString = new AMQShortString(generateTestString(255));
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests write and read operations for empty AMQP short string.
+ */
+ public void testObjectToEntryForEmptyShortString()
+ {
+ AMQShortString testString = new AMQShortString("");
+ assertObjectToEntryConversion(testString);
+ }
+
+ /**
+ * Tests
+ * {@link AMQShortStringEncoding#writeShortString(AMQShortString, TupleOutput)}
+ * and {@link AMQShortStringEncoding#readShortString(TupleInput)} to write
+ * and read AMQP short strings.
+ */
+ protected void assertObjectToEntryConversion(AMQShortString testString)
+ {
+ AMQShortStringTB_4 tupleBinding = new AMQShortStringTB_4();
+ // write string into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ tupleBinding.objectToEntry(testString, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read string from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString restoredString = tupleBinding.entryToObject(tupleInput);
+
+ // assert read string against original string
+ assertEquals(testString, restoredString);
+ }
+
+ /**
+ * A helper method to generate a test string of given size
+ *
+ * @param length
+ * string length
+ * @return string
+ */
+ private String generateTestString(int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+}
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingKeyTupleBindingTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.NullRootMessageLogger;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests BindingKey tuple serialization/de-serialization.
+ *
+ */
+public class BindingKeyTupleBindingTest extends TestCase
+{
+ // test queue tuple binding factory
+ private BindingTupleBindingFactory _factory;
+
+ public void setUp() throws Exception
+ {
+ CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
+ Configuration config = new PropertiesConfiguration();
+ VirtualHostConfiguration hostConfiguration = new
VirtualHostConfiguration("junit", config);
+ VirtualHost virtualHost = new VirtualHost(hostConfiguration);
+ _factory = new BindingTupleBindingFactory(4, virtualHost,
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+ }
+
+ /**
+ * Tests {@link BindingKey} object serialization/de-serialization.
+ */
+ public void testObjectToEntryConversion()
+ {
+ BindingKey key = new BindingKey(new AMQShortString("test1"), new
AMQShortString("test2"),
+ new AMQShortString("test3"), null);
+
+ // write into to tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ TupleBinding<BindingKey> keyTupleBinding = _factory.getInstance();
+
+ keyTupleBinding.objectToEntry(key, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ BindingKey storedKey = keyTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedKey);
+
+ assertEquals(key.getExchangeName(), storedKey.getExchangeName());
+ assertEquals(key.getQueueName(), storedKey.getQueueName());
+ assertEquals(key.getRoutingKey(), storedKey.getRoutingKey());
+ }
+}
Added:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java
(rev 0)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryKeyTupleBindingTest.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuples.AMQShortStringTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+/**
+ * Tests {@link QueueEntryKeyTupleBinding}
+ */
+public class QueueEntryKeyTupleBindingTest extends TestCase
+{
+ /**
+ * Tests {@link QueueEntryKey} object serialization/de-serialization with
+ * {@link QueueEntryKeyTupleBinding}
+ */
+ public void testObjectToEntryConversion()
+ {
+ QueueEntryKey queueEntryKey = new QueueEntryKey(new AMQShortString("test
queue"), 2);
+ QueueEntryKeyTupleBinding queueEntryKeyTupleBinding = new
QueueEntryKeyTupleBinding(
+ AMQShortStringTupleBindingFactory.createTupleBinding(4));
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ queueEntryKeyTupleBinding.objectToEntry(queueEntryKey, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ QueueEntryKey storedQueueEntryKey =
queueEntryKeyTupleBinding.entryToObject(tupleInput);
+
+ assertNotNull(storedQueueEntryKey);
+
+ assertEquals(queueEntryKey.messageId, storedQueueEntryKey.messageId);
+ assertEquals(queueEntryKey.queueName, storedQueueEntryKey.queueName);
+ }
+
+}
Modified:
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
---
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -37,7 +37,7 @@
public JNDIHelper(String broker)
{
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" +
broker + "'";
+ CONNECTION_NAME =
"amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker +
"'";
setupJNDI();
}
Modified:
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-03
18:49:45 UTC (rev 4455)
+++
store/branches/java/0.5.x-dev/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2011-05-04
09:13:25 UTC (rev 4456)
@@ -22,11 +22,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryKeyTupleBinding;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
@@ -51,6 +55,9 @@
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Database;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.ByteBinding;
@@ -95,7 +102,8 @@
private boolean _interactive;
private boolean _force;
- private static final String VERSION = "1.0";
+ private static final String VERSION = "2.0";
+ private static final String USER_ABORTED_PROCESS = "User aborted process";
private static final String OPTION_INPUT_SHORT = "i";
private static final String OPTION_INPUT = "input";
private static final String OPTION_OUTPUT_SHORT = "o";
@@ -244,7 +252,7 @@
{
if (input.equalsIgnoreCase(ANSWER_A) ||
input.equalsIgnoreCase(ANSWER_ABORT))
{
- throw new RuntimeException("User aborted process");
+ throw new RuntimeException(USER_ABORTED_PROCESS);
}
}
}
@@ -284,6 +292,11 @@
File backupDir, boolean force,
boolean inplace) throws Exception
{
+ if (version == BDBMessageStore.DATABASE_FORMAT_VERSION)
+ {
+ throw new IllegalArgumentException("Store on disk already upgraded to
latest version "
+ + BDBMessageStore.DATABASE_FORMAT_VERSION);
+ }
_logger.info("Located store to upgrade at '" + fromDir +
"'");
// Verify user has created a backup, giving option to perform backup
@@ -311,7 +324,7 @@
if (!userInteract("Are you sure wish to proceed with DB
migration without backup? " +
"(For more details of the consequences check
the Qpid/BDB Message Store Wiki)."))
{
- throw new IllegalArgumentException("Upgrade stopped as user
request as no DB Backup performed.");
+ throw new IllegalArgumentException("Upgrade stopped at user
request as no DB Backup performed.");
}
}
}
@@ -350,17 +363,22 @@
try
{
- //Load the old MessageStore
+ int upgradeVersion = 2;
switch (version)
{
default:
+ case 2:
+ upgradeVersion = 2;
+ break;
case 1:
- _oldMessageStore = new BDBMessageStore(1);
- _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
- _oldMessageStore.start();
- upgradeFromVersion_1();
+ upgradeVersion = 1;
break;
}
+ //Load the old MessageStore
+ _oldMessageStore = new BDBMessageStore(upgradeVersion);
+ _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ _oldMessageStore.start();
+ upgrade(upgradeVersion);
}
finally
{
@@ -394,15 +412,34 @@
}
}
- private void upgradeFromVersion_1() throws AMQException, DatabaseException
+ private void upgrade(int version) throws AMQException, DatabaseException
{
- _logger.info("Starting store upgrade from version 1");
+ _logger.info("Starting store upgrade from version " + version);
_logger.info("Message Metadata");
//Migrate _messageMetaDataDb;
- moveContents(_oldMessageStore.getMetaDataDb(), _newMessageStore.getMetaDataDb(),
"Message MetaData");
+ final TupleBinding<AMQShortString> newShortStringTupleBinding =
_newMessageStore.getShortStringTupleBindingFactory().getInstance();
+ final MessageMetaDataTB newMessageMetaDataTupleBinding = new
MessageMetaDataTB(newShortStringTupleBinding);
+ final TupleBinding<AMQShortString> oldShortStringTupleBinding =
_oldMessageStore.getShortStringTupleBindingFactory().getInstance();
+ final MessageMetaDataTB oldMessageMetaDataTupleBinding = new
MessageMetaDataTB(oldShortStringTupleBinding);
+ DatabaseVisitor messageMetaDataDBVisitor = new DatabaseVisitor()
+ {
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws
AMQException, DatabaseException
+ {
+ MessageMetaData metaData =
oldMessageMetaDataTupleBinding.entryToObject(value);
+ DatabaseEntry data = new DatabaseEntry();
+ newMessageMetaDataTupleBinding.objectToEntry(metaData, data);
+ _newMessageStore.getMetaDataDb().put(null, entry, data);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitMetaDataDb(messageMetaDataDBVisitor);
+
+ logCount(messageMetaDataDBVisitor.getVisitedCount(), "Message
MetaData");
+
_logger.info("Message Contents");
//Migrate _messageContentDb;
moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb(),
"Message Content");
@@ -410,6 +447,7 @@
_logger.info("Queues");
//Migrate _queueDb;
//Get the oldMessageStore Tuple Binding which does the parsing
+ @SuppressWarnings({ "rawtypes" })
final TupleBinding queueTupleBinding =
_oldMessageStore.getQueueTupleBindingFactory().getInstance();
//Create a visitor that will take the queues in the oldMessageStore and add them
to the newMessageStore
@@ -440,16 +478,54 @@
_logger.info("Delivery Records");
//Migrate _deliveryDb;
- moveContents(_oldMessageStore.getDeliveryDb(), _newMessageStore.getDeliveryDb(),
"Delivery Record");
+ final EntryBinding<QueueEntryKey> oldKeyTupleBinding = new
QueueEntryKeyTupleBinding(oldShortStringTupleBinding);
+ final EntryBinding<QueueEntryKey> newKeyTupleBinding = new
QueueEntryKeyTupleBinding(newShortStringTupleBinding);
+ DatabaseVisitor deliveryDBVisitor = new DatabaseVisitor()
+ {
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws
AMQException, DatabaseException
+ {
+ QueueEntryKey keyObject = oldKeyTupleBinding.entryToObject(entry);
+ DatabaseEntry key = new DatabaseEntry();
+ newKeyTupleBinding.objectToEntry(keyObject, key);
+ _newMessageStore.getDeliveryDb().put(null, key, value);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitDelivery(deliveryDBVisitor);
+
+ logCount(queueVisitor.getVisitedCount(), "Delivery Record");
+
_logger.info("Exchanges");
//Migrate _exchangeDb;
- moveContents(_oldMessageStore.getExchangesDb(),
_newMessageStore.getExchangesDb(), "Exchange");
+ // old exchange tuple binding
+ final ExchangeTB oldExchangeTupleBinding = new ExchangeTB(_oldVirtualHost,
oldShortStringTupleBinding);
+ final ExchangeTB newExchangeTupleBinding = new ExchangeTB(_oldVirtualHost,
newShortStringTupleBinding);
+ DatabaseVisitor echangeDBVisitor = new DatabaseVisitor()
+ {
+
+ @Override
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws
AMQException, DatabaseException
+ {
+ Exchange exchange = oldExchangeTupleBinding.entryToObject(value);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ newShortStringTupleBinding.objectToEntry(exchange.getName(), key);
+ newExchangeTupleBinding.objectToEntry(exchange, data);
+ _newMessageStore.getExchangesDb().put(null, key, data);
+ _count++;
+ }
+ };
+ _oldMessageStore.visitExchanges(echangeDBVisitor);
+
+ logCount(queueVisitor.getVisitedCount(), "Exchange");
+
+
_logger.info("QueueBindings");
//Migrate _queueBindingsDb;
- final TupleBinding bindingTupleBinding =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
-
+ final TupleBinding<BindingKey> bindingTupleBinding =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
//Create a visitor that to read the old format queue bindings
DatabaseVisitor queueBindings = new DatabaseVisitor()
{
@@ -458,7 +534,7 @@
BindingKey queueBinding = (BindingKey)
bindingTupleBinding.entryToObject(key);
//Create a new Format TupleBinding
- TupleBinding newBindingTupleBinding =
_newMessageStore.getBindingTupleBindingFactory().getInstance();
+ TupleBinding<BindingKey> newBindingTupleBinding =
_newMessageStore.getBindingTupleBindingFactory().getInstance();
DatabaseEntry newKey = new DatabaseEntry();
newBindingTupleBinding.objectToEntry(queueBinding, newKey);
@@ -712,7 +788,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -725,6 +801,7 @@
}
+ @SuppressWarnings("static-access")
private static void setOptions(Options options)
{
Option input =
@@ -759,22 +836,22 @@
{
_logger.info("Running BDB Message Store upgrade tool: v" + VERSION);
+ int version = getStoreVersion(fromDir);
+ if (version == 0)
+ {
+ _logger.info("Existing store version is undefined!");
+ return;
+ }
+ _logger.info("Existing store version is " + version);
try
{
- new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive,
force).upgradeFromVersion(1);
+ new BDBStoreUpgrade(fromDir.toString(), toDir, backupDir, interactive,
force).upgradeFromVersion(version);
_logger.info("Upgrade complete.");
}
catch (IllegalArgumentException iae)
{
- if (iae.getMessage().endsWith("Error: Unable to load BDBStore as version
1. Store on disk contains version 2 data."))
- {
- System.out.println("Store '" + fromDir + "' has
already been upgraded to version 2.");
- }
- else
- {
- _logger.error("Upgrade not started due to: " +
iae.getMessage());
- }
+ _logger.error("Upgrade not started due to: " + iae.getMessage());
}
catch (DatabaseException de)
{
@@ -783,7 +860,7 @@
}
catch (RuntimeException re)
{
- if (!re.getMessage().equals("User aborted process"))
+ if (!(USER_ABORTED_PROCESS).equals(re.getMessage()))
{
re.printStackTrace();
_logger.error("Upgrade Failed: " + re.getMessage());
@@ -800,6 +877,64 @@
}
}
+ /**
+ * Detects existing store version by checking list of database in store
+ * environment
+ *
+ * @param fromDir
+ * store folder
+ * @return version
+ */
+ public static int getStoreVersion(File fromDir)
+ {
+ int version = 0;
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(false);
+ envConfig.setTransactional(false);
+ envConfig.setReadOnly(true);
+ Environment environment = null;
+ try
+ {
+
+ environment = new Environment(fromDir, envConfig);
+ List<String> databases = environment.getDatabaseNames();
+ for (String name : databases)
+ {
+ if (name.startsWith("exchangeDb"))
+ {
+ if (name.startsWith("exchangeDb_v"))
+ {
+ version = Integer.parseInt(name.substring(12));
+ }
+ else
+ {
+ version = 1;
+ }
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failure to open existing database: " +
e.getMessage());
+ }
+ finally
+ {
+ if (environment != null)
+ {
+ try
+ {
+ environment.close();
+ }
+ catch (Exception e)
+ {
+ // ignoring. It should never happen.
+ }
+ }
+ }
+ return version;
+ }
+
private static void fatalError(String message)
{
System.out.println(message);