Author: rgemmell
Date: 2010-05-14 12:23:59 -0400 (Fri, 14 May 2010)
New Revision: 3971
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
Removed:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial update to implement the new RecoveryHandler store behaviour, plus other updates
required since 0-10 support was added to the trunk broker.
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,53 +20,66 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-import com.sleepycat.je.*;
-
import org.apache.commons.configuration.Configuration;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.AbstractMessageStore;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import
org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import
org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import
org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
+import
org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high
performance log.
@@ -76,62 +89,40 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind
and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers.
</table>
*/
-public class BDBMessageStore extends AbstractMessageStore
+public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
- private static final int DATABASE_FORMAT_VERSION = 2;
+ private static final int DATABASE_FORMAT_VERSION = 3;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
-
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
-
- /**
- * Maps from messageId to an AMQMessage (note we don't use serialisation but this
is what it roughly corresponds
- * to)
- */
+ private String MESSAGECONTENTDB_NAME = "messageContentDb";
+ private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+ private String DELIVERYDB_NAME = "deliveryDb";
+ private String EXCHANGEDB_NAME = "exchangeDb";
+ private String QUEUEDB_NAME = "queueDb";
private Database _messageMetaDataDb;
-
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
-
private Database _messageContentDb;
-
- private String QUEUEDB_NAME = "queueDb";
-
- /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
- private Database _queueDb;
-
- private String DELIVERYDB_NAME = "deliveryDb";
-
- /** Maps from a queue name to a message id. This is what stores the pending
deliveries for a given queue */
+ private Database _queueBindingsDb;
private Database _deliveryDb;
-
- private String EXCHANGEDB_NAME = "exchangeDb";
private Database _exchangeDb;
+ private Database _queueDb;
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private Database _queueBindingsDb;
+ private LogSubject _logSubject;
- private VirtualHost _virtualHost;
+ private final AtomicLong _messageId = new AtomicLong(0);
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private final AtomicLong _queueId = new AtomicLong(1);
-
private final CommitThread _commitThread = new
CommitThread("Commit-Thread");
- private Map<AMQShortString, Long> _queueNameToIdMap = new
ConcurrentHashMap<AMQShortString, Long>();
-
- // Factory Classes to create the TupleBinding objects that relfect the version
instance of this BDBStore
-
+ // Factory Classes to create the TupleBinding objects that reflect the version
instance of this BDBStore
+ private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
- Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString,
Integer>();
-
/** The data version this store should run with */
private int _version;
private enum State
@@ -149,6 +140,9 @@
private TransactionConfig _transactionConfig = new TransactionConfig();
+ private boolean _configured;
+
+
public BDBMessageStore()
{
this(DATABASE_FORMAT_VERSION);
@@ -176,23 +170,78 @@
QUEUEBINDINGSDB_NAME += "_v" + version;
}
}
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject,
ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+ if(_configured)
+ {
+ throw new Exception("ConfigStore already configured");
+ }
+
+ configure(name,storeConfiguration);
+
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
+
+ recover(recoveryHandler);
+ stateTransition(State.RECOVERING, State.STARTED);
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverMessages(recoveryHandler);
+ }
+
+ public void configureTransactionLog(String name, TransactionLogRecoveryHandler
recoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject,
TransactionLogMessages.TXN_1001(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverQueueEntries(recoveryHandler);
+
+
+ }
+
+ public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ {
+ return new BDBTransaction();
+ }
+
+
/**
- * Called after instantiation in order to configure the message store. A particular
implementation can define
- * whatever parameters it wants.
+ * Called after instantiation in order to configure the message store.
*
- * @param virtualHost The virtual host using by this store
- * @param base Not used
+ * @param name The name of the virtual host using this store
* @param vHostConfig The configuration for this virtualhost
+ * @return whether a new store environment was created or not (to indicate whether
recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure
itself.
*/
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration
vHostConfig) throws Exception
+ public boolean configure(String name, Configuration storeConfig) throws Exception
{
- super.configure(virtualHost, base, vHostConfig);
-
- Configuration config = vHostConfig.getStoreConfiguration();
- File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY,
System.getProperty("QPID_WORK") + "/bdbstore/" +
virtualHost.getName()));
+ File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK") +
"/bdbstore/" + name));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -204,53 +253,39 @@
CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
- _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
+ _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
- configure(virtualHost, environmentPath, false);
+ return configure(environmentPath, false);
}
- public void configure(File environmentPath) throws AMQException, DatabaseException
+
+ /**
+ *
+ * @param environmentPath location for the store to be created in/recovered from
+ * @param readonly if true then don't allow modifications to an existing store,
and don't create a new store if none exists
+ * @return whether or not a new store environment was created
+ * @throws AMQException
+ * @throws DatabaseException
+ */
+ protected boolean configure(File environmentPath, boolean readonly) throws
AMQException, DatabaseException
{
- configure(null, environmentPath, false);
- }
-
- public void configure(VirtualHost virtualHost, File environmentPath, boolean
readonly) throws AMQException, DatabaseException
- {
stateTransition(State.INITIAL, State.CONFIGURING);
_log.info("Configuring BDB message store");
+ createTupleBindingFactories(_version);
+
setDatabaseNames(_version);
- if (virtualHost != null)
- {
- setVirtualHost(virtualHost);
- }
-
- boolean newEnvironment = setupStore(environmentPath, readonly);
-
- // Performing recovery when we only want read access will cause all the broker
objects to be recreated
- // This will/may include thread pool creations that may be duplicated when
manually inspecting the state of
- // the store. Simplest solution is to prevent the initial creation of the state
by blocking recovery.
- if (!readonly)
- {
- stateTransition(State.CONFIGURING, State.CONFIGURED);
-
- //If we have loaded an environment and have virtualHost configured then
recover environment
- if (!newEnvironment && virtualHost != null)
- {
- // this recovers durable queues and persistent messages
- recover(virtualHost);
- }
- }
-
- //if we have a new environment there we can jump to started as there is no
recovery requried..
- if (newEnvironment) // && !readonly is implied as you cant get a
newEnviroment in readonly mode.
- {
- stateTransition(State.CONFIGURED, State.STARTED);
- }
-
+ return setupStore(environmentPath, readonly);
}
+
+ private void createTupleBindingFactories(int version)
+ {
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
+ _metaDataTupleBindingFactory = new
MessageMetaDataTupleBindingFactory(version);
+ }
/**
* Move the store state from CONFIGURING to STARTED.
@@ -288,7 +323,8 @@
{
int versionIndex = s.indexOf("_v");
- // DB is v1 if _version is not v1 then error
+ // lack of _v index suggests DB is v1
+ // so if _version is not v1 then error
if (versionIndex == -1)
{
if (_version != 1)
@@ -315,12 +351,6 @@
}
}
- private void createTupleBindingFactories(int version)
- {
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version,
_virtualHost);
- }
-
private synchronized void stateTransition(State requiredState, State newState) throws
AMQException
{
if (_state != requiredState)
@@ -344,16 +374,15 @@
{
_log.info("BDB message store using environment path " +
environmentPath.getAbsolutePath());
EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allos the creation of the store if it does not already exist.
+ // This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(false);
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
- // Restore 500,000 default timeout.
+ // Restore 500,000 default timeout.
//envConfig.setLockTimeout(15000);
// Added to help diagnosis of Deadlock issue
- //
//
http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
if (Boolean.getBoolean("qpid.bdb.lock.debug"))
{
@@ -467,8 +496,8 @@
closeEnvironment();
_state = State.CLOSED;
-
- super.close();
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED());
}
private void closeEnvironment() throws DatabaseException
@@ -483,43 +512,275 @@
}
}
+
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws
AMQException
+ {
+ stateTransition(State.CONFIGURED, State.RECOVERING);
+
+
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START());
+
+ try
+ {
+ QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+ loadQueues(qrh);
+
+ ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ loadExchanges(erh);
+
+ BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ recoverBindings(brh);
+
+ brh.completeBindingRecovery();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQException("Error recovering persistent state: " + e,
e);
+ }
+
+ }
+
+ private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _queueDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _queueTupleBindingFactory.getInstance();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+
+ String queueName = queueRecord.getNameShortString() == null ? null :
+ queueRecord.getNameShortString().asString();
+ String owner = queueRecord.getOwner() == null ? null :
+ queueRecord.getOwner().asString();
+ FieldTable arguments = queueRecord.getArguments();
+
+ qrh.queue(queueName, owner, arguments);
+ }
+
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+
+ private void loadExchanges(ExchangeRecoveryHandler erh) throws AMQException,
DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _exchangeDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = new ExchangeTB();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord)
binding.entryToObject(value);
+
+ String exchangeName = exchangeRec.getNameShortString() == null ? null :
+ exchangeRec.getNameShortString().asString();
+ String type = exchangeRec.getType() == null ? null :
+ exchangeRec.getType().asString();
+ boolean autoDelete = exchangeRec.isAutoDelete();
+
+ erh.exchange(exchangeName, type, autoDelete);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _queueBindingsDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _bindingTupleBindingFactory.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ //yes, this is retrieving all the useful information from the key only.
+ //For table compatibility it shall currently be left as is
+ BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
+
+ String exchangeName = bindingRecord.getExchangeName() == null ? null :
+ bindingRecord.getExchangeName().asString();
+ String queueName = bindingRecord.getQueueName() == null ? null :
+ bindingRecord.getQueueName().asString();
+ String routingKey = bindingRecord.getRoutingKey() == null ? null :
+ bindingRecord.getRoutingKey().asString();
+ ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
+
java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
+
+ brh.binding(exchangeName, queueName, routingKey, argumentsBB);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverMessages(MessageStoreRecoveryHandler msrh) throws
DatabaseException
+ {
+ StoredMessageRecoveryHandler mrh = msrh.begin();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _messageMetaDataDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
+
+ DatabaseEntry value = new DatabaseEntry();
+ EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+
+ long maxId = 0;
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = (Long) keyBinding.entryToObject(key);
+ StorableMessageMetaData metaData = (StorableMessageMetaData)
valueBinding.entryToObject(value);
+
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData,
false);
+ mrh.message(message);
+
+ maxId = Math.max(maxId, messageId);
+ }
+
+ _messageId.set(maxId);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e, e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+ throws DatabaseException, AMQException
+ {
+ QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _deliveryDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new QueueEntryTB();
+
+ DatabaseEntry value = new DatabaseEntry();
+ EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+
+ QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+ AMQShortString queueName = dd.getQueueName();
+ long messageId = dd.getMessageId();
+
+ qerh.queueEntry(queueName.asString(),messageId);
+
+// if (_log.isDebugEnabled())
+// {
+// _log.debug("On recovery, delivering Message ID:" +
message.getMessageId() + " to " + queue.getName());
+// }
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e, e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+// if (_log.isInfoEnabled())
+// {
+// _log.info("Recovered message counts: " + _queueRecoveries);
+// }
+//
+// for(Map.Entry<AMQShortString,Integer> entry :
_queueRecoveries.entrySet())
+// {
+// CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
+//
+// CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
+// }
+ }
+
/**
- * Removes the specified message from the store in the given transactional store
context.
+ * Removes the specified message from the store.
*
- * @param context The transactional context to remove the message in.
* @param messageId Identifies the message to remove.
*
* @throws AMQException If the operation fails for any reason.
+ * @throws DatabaseException
*/
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " +
context + ", Long messageId = " + messageId
// + "): called");
- boolean localTx = getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = null;
+
+ Cursor cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding metaKeyBindingTuple =
TupleBinding.getPrimitiveBinding(Long.class);
+ metaKeyBindingTuple.objectToEntry(messageId, key);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message Id: " + messageId + "
Removing");
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Message Id: " + messageId + " Removing");
- }
-
- // first we need to look up the header to get the chunk count
- MessageMetaData mmd = getMessageMetaData(context, messageId);
- try
- {
+
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- if (localTx)
- {
- tx.abort();
- context.setPayload(null);
- }
+ tx.abort();
throw new AMQException("Message metadata not found for message id
" + messageId);
}
@@ -529,56 +790,73 @@
_log.debug("Deleted metadata for message " + messageId);
}
- DatabaseEntry contentKey = new DatabaseEntry();
- TupleBinding contentKeyBinding = new MessageContentKey.TupleBinding();
- for (int i = 0; i < mmd.getContentChunkCount(); i++)
+ //now remove the content data from the store if there is any.
+
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_3 mck = new MessageContentKey_3(messageId,0);
+
+ TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
+
+ cursor = _messageContentDb.openCursor(tx, null);
+
+ status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
{
- MessageContentKey mck = new MessageContentKey(messageId, i);
- contentKeyBinding.objectToEntry(mck, contentKey);
- status = _messageContentDb.get(tx, contentKey, new DatabaseEntry(),
LockMode.RMW);
- if (status == OperationStatus.NOTFOUND)
+ mck = (MessageContentKey_3)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
+
+ if(mck.getMessageId() != messageId)
{
- if (localTx)
- {
- tx.abort();
- context.setPayload(null);
- }
-
- throw new AMQException("Content chunk " + i + " not
found for message " + messageId);
+ //we have exhausted all chunks for this message id, break
+ break;
}
-
- status = _messageContentDb.delete(tx, contentKey);
- if (status == OperationStatus.NOTFOUND)
+ else
{
- if (localTx)
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
{
+ cursor.close();
+ cursor = null;
+
tx.abort();
- context.setPayload(null);
+ throw new AMQException("Content chunk offset" +
mck.getOffset() + " not found for message " + messageId);
}
-
- throw new AMQException("Content chunk " + i + " not
found for message " + messageId);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " +
mck.getOffset() + " for message " + messageId);
+ }
}
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content chunk " + i + " for
message " + messageId);
- }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
- if (localTx)
- {
- commit(tx);
- context.setPayload(null);
- }
+ cursor.close();
+ cursor = null;
+
+ commit(tx);
}
catch (DatabaseException e)
{
- if ((tx != null) && localTx)
+ e.printStackTrace();
+
+ if (tx != null)
{
try
{
+ if(cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+
tx.abort();
- context.setPayload(null);
}
catch (DatabaseException e1)
{
@@ -586,8 +864,23 @@
}
}
- throw new AMQException("Error writing AMQMessage with id " +
messageId + " to database: " + e, e);
+ throw new AMQException("Error removing message with id " +
messageId + " from database: " + e, e);
}
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
/**
@@ -601,12 +894,17 @@
{
if (_state != State.RECOVERING)
{
+ ExchangeRecord exchangeRec = new
ExchangeRecord(exchange.getNameShortString(),
+ exchange.getTypeShortString(),
exchange.isAutoDelete());
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
+
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
- exchangeBinding.objectToEntry(exchange, value);
+ TupleBinding exchangeBinding = new ExchangeTB();
+ exchangeBinding.objectToEntry(exchangeRec, value);
+
try
{
_exchangeDb.put(null, key, value);
@@ -630,7 +928,7 @@
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
try
{
OperationStatus status = _exchangeDb.delete(null, key);
@@ -645,117 +943,9 @@
}
}
- private void recoverExchanges() throws AMQException, DatabaseException
- {
- for (Exchange exchange : loadExchanges())
- {
- recoverExchange(exchange);
- }
- }
- private void recoverExchange(Exchange exchange) throws AMQException,
DatabaseException
- {
- _log.info("Recovering durable exchange " + exchange.getName() + "
of type " + exchange.getType() + "...");
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- for (BindingKey binding : loadQueueBindings(exchange))
- {
- AMQQueue queue = queueRegistry.getQueue(binding.getQueueName());
- if (queue == null)
- {
- _log.error("Unknown queue: " + binding.getQueueName() + "
cannot be bound to exchange: "
- + exchange.getName());
- }
- else
- {
- _log.info("Restoring binding: (Exchange: " +
binding.getExchangeName() + ", Queue: " + binding
- .getQueueName() + ", Routing Key: " +
binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
- + ")");
-
- queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
- }
- }
- }
-
- private List<BindingKey> loadQueueBindings(Exchange exchange) throws
DatabaseException
- {
-
- Cursor cursor = null;
- List<BindingKey> queueBindings = new ArrayList<BindingKey>();
- try
- {
- cursor = _queueBindingsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- BindingKey queueBinding =
- new BindingKey(exchange.getName(), null, null, null);
-
- EntryBinding<BindingKey> keyBinding =
_bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(queueBinding, key);
-
- OperationStatus opStatus = cursor.getSearchKeyRange(key, value,
LockMode.RMW);
-
- TupleBinding binding = _bindingTupleBindingFactory.getInstance();
- while (opStatus == OperationStatus.SUCCESS)
- {
- queueBinding = (BindingKey) binding.entryToObject(key);
- if (queueBinding.getExchangeName().equals(exchange.getName()))
- {
- queueBindings.add(queueBinding);
- opStatus = cursor.getNext(key, value, LockMode.RMW);
- }
- else
- {
- break;
- }
- }
-
- return queueBindings;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private List<Exchange> loadExchanges() throws AMQException, DatabaseException
- {
- Cursor cursor = null;
- List<Exchange> exchanges = new ArrayList<Exchange>();
- try
- {
- cursor = _exchangeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB(_virtualHost);
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Exchange exchange = (Exchange) binding.entryToObject(value);
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
- exchanges.add(exchange);
- _log.info("Registering exchange " + exchange.getName());
- }
-
- return exchanges;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
/**
* Binds the specified queue to an exchange with a routing key.
*
@@ -773,13 +963,20 @@
if (_state != State.RECOVERING)
{
+ BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
+ queue.getNameShortString(), routingKey,
args);
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(),
routingKey, args), key);
+
+ keyBinding.objectToEntry(bindingRecord, key);
+ //yes, this is writing out 0 as a value and putting all the
+ //useful info into the key, don't ask me why. For table
+ //compatibility it shall currently be left as is
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
-
+
try
{
_queueBindingsDb.put(null, key, value);
@@ -807,7 +1004,7 @@
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(),
routingKey, args), key);
+ keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(),
queue.getNameShortString(), routingKey, args), key);
try
{
@@ -844,20 +1041,17 @@
if (_state != State.RECOVERING)
{
- long queueId = _queueId.getAndIncrement();
- _queueNameToIdMap.put(queue.getName(), queueId);
-
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+ queue.getOwner(), arguments);
+
DatabaseEntry key = new DatabaseEntry();
-
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getName(), key);
+ keyBinding.objectToEntry(queue.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
- ((QueueTuple) queueBinding).setArguments(arguments);
-
- queueBinding.objectToEntry(queue, value);
+ queueBinding.objectToEntry(queueRecord, value);
try
{
_queueDb.put(null, key, value);
@@ -878,12 +1072,10 @@
*/
public void removeQueue(final AMQQueue queue) throws AMQException
{
- AMQShortString name = queue.getName();
+ AMQShortString name = queue.getNameShortString();
_log.debug("public void removeQueue(AMQShortString name = " + name +
"): called");
- Long queueId = _queueNameToIdMap.remove(name);
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
@@ -902,34 +1094,6 @@
}
/**
- * Really for testing purposes.
- *
- * @param name
- *
- * @return
- *
- * @throws AMQException
- */
- AMQQueue getQueue(AMQShortString name) throws AMQException
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
- DatabaseEntry value = new DatabaseEntry();
- try
- {
- _queueDb.get(null, key, value, LockMode.RMW);
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
-
- return (AMQQueue) binding.entryToObject(value);
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error getting queue with name " + name +
" from database: " + e, e);
- }
- }
-
- /**
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
@@ -938,15 +1102,16 @@
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId)
throws AMQException
+ public void enqueueMessage(StoreContext context, final TransactionLogResource queue,
Long messageId) throws AMQException
{
// _log.debug("public void enqueueMessage(StoreContext context = " +
context + ", AMQShortString name = " + name
// + ", Long messageId): called");
- AMQShortString name = queue.getName();
- Transaction tx = (Transaction) context.getPayload();
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -968,28 +1133,6 @@
}
}
- private boolean getOrCreateTransaction(StoreContext context) throws AMQException
- {
-
- Transaction tx = (Transaction) context.getPayload();
- if (tx == null)
- {
- try
- {
- tx = _environment.beginTransaction(null, null);
- context.setPayload(tx);
-
- return true;
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error beginning transaction: " + e,
e);
- }
- }
-
- return false;
- }
-
/**
* Extracts a message from a specified queue, in a given transactional context.
*
@@ -999,14 +1142,14 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long
messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final TransactionLogResource queue,
Long messageId) throws AMQException
{
- AMQShortString name = queue.getName();
- boolean isLocal = getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
@@ -1034,82 +1177,18 @@
_log.debug("Removed message " + messageId + ", " +
name + " from delivery db");
}
-
- if (isLocal)
- {
- commit(tx);
- context.setPayload(null);
- }
}
catch (DatabaseException e)
{
_log.error("Failed to dequeue message " + messageId + ":
" + e, e);
_log.error(tx);
- if (isLocal)
- {
- try
- {
- tx.abort();
- context.setPayload(null);
- }
- catch (DatabaseException e1)
- {
- throw new AMQException("Error rolling back transaction: " +
e1, e1);
- }
- }
throw new AMQException("Error accessing database while dequeuing
message: " + e, e);
}
}
- private boolean isLocalTransaction(StoreContext context)
- {
- return context.getPayload() == null;
- }
-
/**
- * Begins a transactional context.
- *
- * @param context The transactional context to begin.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- public void beginTran(StoreContext context) throws AMQException
- {
- // _log.debug("public void beginTran(StoreContext context = " + context
+ "): called");
-
- if (context.getPayload() != null)
- {
- throw new AMQException("Fatal internal error: transactional context is
not empty at beginTran: "
- + context.getPayload());
- }
- else
- {
- try
- {
- context.setPayload(_environment.beginTransaction(null, _transactionConfig
));
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error starting transaction: " + e, e);
- }
- }
- }
-
- /**
- * Tests a transactional context to see if it has been begun but not yet committed or
aborted.
- *
- * @param context The transactional context to test.
- *
- * @return <tt>true</tt> if the transactional context is live,
<tt>false</tt> otherwise.
- */
- public boolean inTran(StoreContext context)
- {
- return context.getPayload() != null;
- }
-
- /**
* Commits all operations performed within a given transactional context.
*
* @param context The transactional context to commit all operations for.
@@ -1120,7 +1199,7 @@
{
// _log.debug("public void commitTran(StoreContext context = " +
context + "): called");
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
if (tx == null)
{
@@ -1146,6 +1225,13 @@
}
}
+ public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+ {
+ //TODO - Actually create an async commit implementation.
+ commitTran(context);
+ return IMMEDIATE_FUTURE;
+ }
+
/**
* Abandons all operations performed within a given transactional context.
*
@@ -1155,7 +1241,7 @@
*/
public void abortTran(StoreContext context) throws AMQException
{
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
if (_log.isDebugEnabled())
{
@@ -1194,7 +1280,7 @@
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1204,10 +1290,10 @@
OperationStatus status = cursor.getSearchKeyRange(key, value,
LockMode.DEFAULT);
dd = (QueueEntryKey) keyBinding.entryToObject(key);
- while ((status == OperationStatus.SUCCESS) &&
dd.queueName.equals(queueName))
+ while ((status == OperationStatus.SUCCESS) &&
dd.getQueueName().equals(queueName))
{
- messageIds.add(dd.messageId);
+ messageIds.add(dd.getMessageId());
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
@@ -1237,59 +1323,6 @@
}
}
- public void recover(VirtualHost virtualHost) throws AMQException
- {
- stateTransition(State.CONFIGURED, State.RECOVERING);
-
- _log.info("Recovering persistent state...");
-
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(null,
false));
-
- StoreContext context = new StoreContext();
- try
- {
- beginTran(context);
- Map<AMQShortString, AMQQueue> queues = loadQueues();
-
- for (AMQQueue q : queues.values())
- {
-
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()),
true));
- //Record that we have a queue for recovery
- _queueRecoveries.put(q.getName(), 0);
-
-
q.configure(virtualHost.getConfiguration().getQueueConfiguration(q.getName().asString()));
-
- }
-
- recoverExchanges();
-
- deliverMessages(context, queues);
- _log.info("Persistent state recovered successfully");
- CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERY_COMPLETE(null, false));
-
- commitTran(context);
-
- }
- catch (DatabaseException e)
- {
- abortTran(context);
-
- throw new AMQException("Error recovering persistent state: " + e,
e);
- }
- catch (AMQException amqe)
- {
- abortTran(context);
- throw new AMQException("Error recovering persistent state: " +
amqe, amqe);
- }
- catch (Throwable ioobe)
- {
- abortTran(context);
- throw new AMQException("Invalid database format. Please use upgrade tool
for store in Virtualhost:'"
- + _virtualHost.getName() + "'", ioobe);
- }
-
- stateTransition(State.RECOVERING, State.STARTED);
- }
-
/**
* Return a valid, currently unused message id.
*
@@ -1297,7 +1330,7 @@
*/
public Long getNewMessageId()
{
- return _messageId.getAndIncrement();
+ return _messageId.incrementAndGet();
}
/**
@@ -1305,20 +1338,21 @@
*
* @param context The transactional context for the operation.
* @param messageId The message to store the data for.
- * @param index The index of the data chunk.
+ * @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
* @param lastContentBody Flag to indicate that this is the last such chunk for the
message.
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index,
ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
+ protected void addContent(StoreContext context, Long messageId, int offset,
+ ByteBuffer contentBody) throws AMQException
{
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- TupleBinding keyBinding = new MessageContentKey.TupleBinding();
- keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+ TupleBinding keyBinding = new MessageContentKeyTB_3();
+ keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = new ContentTB();
messageBinding.objectToEntry(contentBody, value);
@@ -1327,13 +1361,13 @@
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQException("Error adding content chunk " + index +
" for message id " + messageId + ": "
+ throw new AMQException("Error adding content chunk offset" +
offset + " for message id " + messageId + ": "
+ status);
}
if (_log.isDebugEnabled())
{
- _log.debug("Storing content chunk " + index + " for
message " + messageId + "[Transaction" + tx + "]");
+ _log.debug("Storing content chunk offset" + offset + " for
message " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
@@ -1351,7 +1385,7 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public void storeMessageMetaData(StoreContext context, Long messageId,
MessageMetaData messageMetaData)
+ private void storeMetaData(StoreContext context, Long messageId,
StorableMessageMetaData messageMetaData)
throws AMQException
{
if (_log.isDebugEnabled())
@@ -1359,14 +1393,15 @@
_log.debug("public void storeMessageMetaData(StoreContext context =
" + context + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " +
messageMetaData + "): called");
}
- //This call breaking tests - not sure where the txn it creates should be
committed ??
- //getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction)
context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1392,11 +1427,11 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
throws AMQException
+ public StorableMessageMetaData getMessageMetaData(Long messageId) throws
AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug("public MessageMetaData getMessageMetaData(StoreContext
context = " + context + ", Long messageId = "
+ _log.debug("public MessageMetaData getMessageMetaData(Long messageId =
"
+ messageId + "): called");
}
@@ -1404,7 +1439,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
try
{
@@ -1414,7 +1449,7 @@
throw new AMQException("Metadata not found for message with id
" + messageId);
}
- MessageMetaData mdd = (MessageMetaData) messageBinding.entryToObject(value);
+ StorableMessageMetaData mdd = (StorableMessageMetaData)
messageBinding.entryToObject(value);
return mdd;
}
@@ -1426,47 +1461,110 @@
}
/**
- * Retrieves a chunk of message data.
+ * Fills the provided ByteBuffer with as much content for the specified message as
possible, starting
+ * from the specified offset in the message.
*
- * @param context The transactional context for the operation.
- * @param messageId The message to get the data chunk for.
- * @param index The offset index of the data chunk within the message.
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
*
- * @return A chunk of message data.
+ * @return The number of bytes inserted into the destination
*
* @throws AMQException If the operation fails for any reason, or if the specified
message does not exist.
*/
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int
index) throws AMQException
+ public int getContent(Long messageId, int offset, ByteBuffer dst) throws
AMQException
{
-
- DatabaseEntry key = new DatabaseEntry();
- TupleBinding keyBinding = new MessageContentKey.TupleBinding();
- keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+ final int limit = offset + dst.remaining();
+
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+
+ //TODO: if requested offset is non zero, use partial record (key-only) search to
+ //locate the first record key to prevent reading in data we dont need
+
+ //Start from 0 offset and search for the starting chunk.
+ MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
+ TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new ContentTB();
+ TupleBinding contentTupleBinding = new ContentTB();
+
if (_log.isDebugEnabled())
{
- _log.debug("Message Id: " + messageId + " Getting content body
chunk: " + index);
+ _log.debug("Message Id: " + messageId + " Getting content body
from offset: " + offset);
}
+ int written = 0;
+ int seenSoFar = 0;
+
+ Cursor cursor = null;
try
{
- OperationStatus status = _messageContentDb.get(null, key, value,
LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
+ cursor = _messageContentDb.openCursor(null, null);
+
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
{
+ mck = (MessageContentKey_3)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ long id = mck.getMessageId();
+
+ if(id != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+
+ int offsetInMessage = mck.getOffset();
+ ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
+
+ final int size = (int) buf.limit();
+
+ seenSoFar += size;
+
+ //TODO: can change this guard if we start recording the last byte in the
chunk record
+ if(seenSoFar >= offset)
+ {
+ byte[] dataAsBytes = buf.array();
- throw new AMQException("Content chunk " + index + " not
found for message with id " + messageId);
+ int posInArray = offset + written - offsetInMessage;
+ int count = size - posInArray;
+ if(count > dst.remaining())
+ {
+ count = dst.remaining();
+ }
+ dst.put(dataAsBytes,posInArray,count);
+ written+=count;
+
+ if(dst.remaining() == 0)
+ {
+ break;
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
- ContentChunk cb = (ContentChunk) messageBinding.entryToObject(value);
-
- return cb;
+ return written;
}
catch (DatabaseException e)
{
throw new AMQException("Error writing AMQMessage with id " +
messageId + " to database: " + e, e);
}
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
public boolean isPersistent()
@@ -1474,47 +1572,27 @@
return true;
}
- Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException,
AMQException
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T
metaData)
{
- Cursor cursor = null;
- Map<AMQShortString, AMQQueue> queues = new HashMap<AMQShortString,
AMQQueue>();
- try
+ if(metaData.isPersistent())
{
- cursor = _queueDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- AMQQueue queue = (AMQQueue) binding.entryToObject(value);
- if (queue != null)
- {
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queue.getName(), queue);
- _log.info("Recovering queue " + queue.getName() + "
with owner:"
- + ((queue.getOwner() == null) ? " <<null>>
" : (" \"" + queue.getOwner() + "\" ")));
- }
- }
-
- return queues;
+ return new StoredBDBMessage(getNewMessageId(), metaData);
}
- finally
+ else
{
- if (cursor != null)
- {
- cursor.close();
- }
+ return new StoredMemoryMessage(getNewMessageId(), metaData);
}
}
- //public getters for the TupleBindingFactories
- public QueueTupleBindingFactory getQueueTupleBindingFactory()
+ //protected getters for the TupleBindingFactories
+
+ protected QueueTupleBindingFactory getQueueTupleBindingFactory()
{
return _queueTupleBindingFactory;
}
- public BindingTupleBindingFactory getBindingTupleBindingFactory()
+ protected BindingTupleBindingFactory getBindingTupleBindingFactory()
{
return _bindingTupleBindingFactory;
}
@@ -1612,162 +1690,8 @@
}
}
- private static final class ProcessAction
+ private void commit(com.sleepycat.je.Transaction tx) throws DatabaseException
{
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
- {
- _queue.enqueue(_context, _message);
- }
-
- }
-
- private void deliverMessages(final StoreContext context, Map<AMQShortString,
AMQQueue> queues)
- throws DatabaseException, AMQException
- {
- Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
-
-
- Cursor cursor = null;
- try
- {
- Transaction tx = (Transaction) context.getPayload();
- cursor = _deliveryDb.openCursor(tx, null);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
-
- DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- long maxId = 1;
-
- TransactionalContext txnContext = new NonTransactionalContext(this, new
StoreContext(), null, null);
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
-
- QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
-
- AMQShortString queueName = dd.queueName;
-
- AMQQueue queue = queues.get(queueName);
-
- // If the matching queue was not already found in the store, check in
case a queue
- // with the same name exists in the virtualhost, otherwise we will create
a duplicate
- // queue and generate a JMX InstanceAlreadyExistsException, preventing
startup.
- if (queue == null)
- {
- queue = _virtualHost.getQueueRegistry().getQueue(queueName);
- if (queue != null)
- {
- queues.put(queueName, queue);
- }
- }
-
- if (queue == null)
- {
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null,
false, _virtualHost, null);
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queueName, queue);
- }
-
- long messageId = dd.messageId;
- maxId = Math.max(maxId, messageId);
- AMQMessage message = msgMap.get(messageId);
-
- if (message != null)
- {
- message.incrementReference();
- }
- else
- {
- message = new AMQMessage(messageId, this, messageHandleFactory,
txnContext);
- msgMap.put(messageId, message);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("On recovery, delivering Message ID:" +
message.getMessageId() + " to " + queue.getName());
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- _queueRecoveries.put(queueName, ++count);
-
- actions.add(new ProcessAction(queue, context, message));
-
- }
-
- for (ProcessAction action : actions)
- {
- action.process();
- }
-
- _messageId.set(maxId + 1);
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e, e);
- throw e;
- }
- catch (AMQException e)
- {
- _log.error("Store Error: " + e, e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Recovered message counts: " + _queueRecoveries);
- }
-
- for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
- {
- CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
-
- CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
- }
-
- //Free the memory
- _queueRecoveries = null;
- }
-
- QueueRegistry getQueueRegistry()
- {
- return _virtualHost.getQueueRegistry();
- }
-
- void setVirtualHost(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
-
- createTupleBindingFactories(_version);
- }
-
- void commit(Transaction tx) throws DatabaseException
- {
// _log.debug("void commit(Transaction tx = " + tx + "):
called");
tx.commitNoSync();
@@ -1787,11 +1711,11 @@
// private static final Logger _log = Logger.getLogger(Commit.class);
private final CommitThread _commitThread;
- private final Transaction _tx;
+ private final com.sleepycat.je.Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
- public Commit(CommitThread commitThread, Transaction tx)
+ public Commit(CommitThread commitThread, com.sleepycat.je.Transaction tx)
{
// _log.debug("public Commit(CommitThread commitThread = " +
commitThread + ", Transaction tx = " + tx
// + "): called");
@@ -1973,5 +1897,184 @@
}
}
}
+
+
+ private class StoredBDBMessage implements StoredMessage
+ {
+ private final long _messageId;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private StoreContext _ctx;
+ private com.sleepycat.je.Transaction _txn;
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, true);
+ }
+
+
+ StoredBDBMessage(long messageId,
+ StorableMessageMetaData metaData, boolean persist)
+ {
+ try
+ {
+ _messageId = messageId;
+
+ _metaDataRef = new
SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
+ {
+ _ctx = new StoreContext();
+ _txn = _environment.beginTransaction(null, null);
+ _ctx.setPayload(_txn);
+ storeMetaData(_ctx, messageId, metaData);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new
SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ try
+ {
+ BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage,
dst);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StoreFuture flushToStore()
+ {
+ try
+ {
+ if(_ctx != null)
+ {
+ BDBMessageStore.this.commitTran(_ctx);
+ }
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _txn = null;
+ _ctx = null;
+ }
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ flushToStore();
+ try
+ {
+ BDBMessageStore.this.removeMessage(_messageId);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private class BDBTransaction implements Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+ private StoreContext _ctx;
+
+ private BDBTransaction()
+ {
+ _ctx = new StoreContext();
+ try
+ {
+ _txn = _environment.beginTransaction(null, null);
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ _ctx.setPayload(_txn);
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws
AMQException
+ {
+ BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws
AMQException
+ {
+ BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+
+ }
+
+ public void commitTran() throws AMQException
+ {
+ BDBMessageStore.this.commitTran(_ctx);
+ }
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return BDBMessageStore.this.commitTranAsync(_ctx);
+ }
+
+ public void abortTran() throws AMQException
+ {
+ BDBMessageStore.this.abortTran(_ctx);
+ }
+ }
+
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.nio.ByteBuffer;
+
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.mina.common.ByteBuffer;
-
/**
* @author Robert Greig (robert.j.greig(a)jpmorgan.com)
*/
@@ -39,39 +37,19 @@
final int size = tupleInput.readInt();
byte[] underlying = new byte[size];
tupleInput.readFast(underlying);
- final ByteBuffer data = ByteBuffer.wrap(underlying);
- ContentChunk cb = new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return data;
- }
-
- public void reduceToFit()
- {
-
- }
- };
- return cb;
+ return ByteBuffer.wrap(underlying);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- ContentChunk cb = (ContentChunk) object;
- final int size = cb.getSize();
- byte[] underlying = new byte[size];
+ ByteBuffer src = (ByteBuffer) object;
+
+ src = src.slice();
- ByteBuffer buf = cb.getData();
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
- buf.duplicate().rewind().get(underlying);
-
- tupleOutput.writeInt(size);
- tupleOutput.writeFast(underlying);
+ tupleOutput.writeInt(chunkData.length);
+ tupleOutput.writeFast(chunkData);
}
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -4,20 +4,15 @@
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
public class ExchangeTB extends TupleBinding
{
private static final Logger _log = Logger.getLogger(ExchangeTB.class);
- private final VirtualHost _virtualHost;
-
- public ExchangeTB(VirtualHost virtualHost)
+ public ExchangeTB()
{
- _virtualHost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -27,43 +22,17 @@
AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
-
- try
- {
- Exchange exchange;
- Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
-
- if (existing != null)
- {
- _log.info("Exchange :" + existing + ": already exists in
configured broker.");
- exchange = existing;
- }
- else
- {
- exchange = _virtualHost.getExchangeFactory().createExchange(name,
typeName, true, autoDelete, 0);
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
- _log.info("Registering new durable exchange from BDB:" +
exchange);
- }
-
- return exchange;
- }
- catch (AMQException e)
- {
- _log.error("Unable to create exchange: " + e, e);
- return null;
- }
+
+ return new ExchangeRecord(name, typeName, autoDelete);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- Exchange exchange = (Exchange) object;
+ ExchangeRecord exchange = (ExchangeRecord) object;
- AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getNameShortString(),
tupleOutput);
AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
tupleOutput.writeBoolean(exchange.isAutoDelete());
-
}
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -1,223 +1,42 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import java.util.Comparator;
-import java.io.Serializable;
-
-/**
- * @author Apache Software Foundation
- */
public class MessageContentKey
{
- public long messageId;
- public int chunk;
-
- public MessageContentKey()
+ private long _messageId;
+
+ public MessageContentKey(long messageId)
{
- }
-
- public MessageContentKey(byte[] payload)
+ _messageId = messageId;
+ }
+
+
+ public long getMessageId()
{
- final TupleInput ti = new TupleInput(payload);
- messageId = ti.readLong();
- chunk = ti.readInt();
+ return _messageId;
}
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
+ public void setMessageId(long messageId)
{
- public Object entryToObject(TupleInput tupleInput)
- {
- final MessageContentKey mk = new MessageContentKey();
- mk.messageId = tupleInput.readLong();
- mk.chunk = tupleInput.readInt();
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final MessageContentKey mk = (MessageContentKey) object;
- tupleOutput.writeLong(mk.messageId);
- tupleOutput.writeInt(mk.chunk);
- }
-
-
+ this._messageId = messageId;
}
-
- public static void writeModifiedLong(TupleOutput t, long l)
- {
- int ln = (int) (0x0F & l);
- int hn = (int) (0xF0 & l);
- t.writeByte(((ln <<4) | (hn>>4)) ^ 0x55);
- t.writeByte((int) (0xFF & (l >> 8)));
- t.writeByte((int) (0xFF & (l >> 16)));
- t.writeByte((int) (0xFF & (l >> 24)));
- t.writeByte((int) (0xFF & (l >> 32)));
- t.writeByte((int) (0xFF & (l >> 40)));
- t.writeByte((int) (0xFF & (l >> 48)));
- t.writeByte((int) (0xFF & (l >> 56)));
-
- }
-
- public static long readModifiedLong(TupleInput t)
- {
- byte b0 = (byte) (0xFF & (t.readByte() ^ 0x55));
- long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
-
- l |= ((long) (0xFF & t.readByte())) << 8;
- l |= ((long) (0xFF & t.readByte())) << 16;
- l |= ((long) (0xFF & t.readByte())) << 24;
- l |= ((long) (0xFF & t.readByte())) << 32;
- l |= ((long) (0xFF & t.readByte())) << 40;
- l |= ((long) (0xFF & t.readByte())) << 48;
- l |= ((long) (0xFF & t.readByte())) << 56;
-
- return l;
- }
-
-
- public static class ContentKeyComparator implements Comparator, Serializable
- {
-
- public int compare(Object o1, Object o2)
- {
- byte[] b1 = (byte[]) o1;
- byte[] b2 = (byte[]) o2;
-
- MessageContentKey ck1 = new MessageContentKey(b1);
- MessageContentKey ck2 = new MessageContentKey(b2);
- if (ck1.messageId == ck2.messageId)
- {
- // reminder of Comparator return value:
- // return a negative integer if the first item is "less" than
the second, 0 if equal
- return ck1.chunk - ck2.chunk;
- }
- else
- {
- return (int) (ck1.messageId - ck2.messageId);
- }
- }
- }
-
- public MessageContentKey(long messageId, int chunk)
- {
- this.chunk = chunk;
- this.messageId = messageId;
- }
-
-
-
- private static final char[] HEX = {
'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
- private static StringBuilder appendHex(StringBuilder str, byte b)
- {
- str.append(HEX[0xF & (b >> 4)]);
- str.append(HEX[0xF & b]);
- return str;
- }
-
- private static StringBuilder appendHex(StringBuilder str, byte[] b)
- {
- for(int i = 0; i < b.length; i++)
- {
- appendHex(str,b[i]);
- }
- return str;
- }
-
- private static StringBuilder appendHex(StringBuilder str, int i)
- {
- appendHex(str,(byte)(0xFF & (i >> 24)));
- appendHex(str,(byte)(0xFF & (i >> 16)));
- appendHex(str,(byte)(0xFF & (i >> 8)));
- appendHex(str,(byte)(0xFF & i));
- return str;
- }
-
-
- private static StringBuilder appendHex(StringBuilder str, long l)
- {
- appendHex(str,(byte)(0xFF & (l >> 56)));
- appendHex(str,(byte)(0xFF & (l >> 48)));
- appendHex(str,(byte)(0xFF & (l >> 40)));
- appendHex(str,(byte)(0xFF & (l >> 32)));
- appendHex(str,(byte)(0xFF & (l >> 24)));
- appendHex(str,(byte)(0xFF & (l >> 16)));
- appendHex(str,(byte)(0xFF & (l >> 8)));
- appendHex(str,(byte)(0xFF & l));
- return str;
- }
-
-
-
-
- private static byte[] convertLong(long l)
- {
- byte[] b = new byte[8];
- int ln = (int) (0x0F & l);
- int hn = (int) (0xF0 & l);
- b[0] = (byte)(((ln <<4) | (hn>>4)) ^ 0x55);
- b[1] = ((byte)(0xFF & (l >> 8)));
- b[2] = ((byte)(0xFF & (l >> 16)));
- b[3] = ((byte)(0xFF & (l >> 24)));
- b[4] = ((byte)(0xFF & (l >> 32)));
- b[5] = ((byte)(0xFF & (l >> 40)));
- b[6] = ((byte)(0xFF & (l >> 48)));
- b[7] = ((byte)(0xFF & (l >> 56)));
-
- return b;
-
- }
-
-
- private static long readModifiedLong(byte[] b)
- {
- byte b0 = (byte) (b[0] ^ 0x55);
- long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
- l |= ((long) b[1]) << 8;
- l |= ((long) b[2]) << 16;
- l |= ((long) b[3]) << 24;
- l |= ((long) b[4]) << 32;
- l |= ((long) b[5]) << 40;
- l |= ((long) b[6]) << 48;
- l |= ((long) b[7]) << 56;
-
-
- return l;
- }
-
-
- public static void main(String[] args)
- {
- StringBuilder s = new StringBuilder();
-
-
-
- for(long i = 1000; i < 1010; i++)
- {
- byte[] b = convertLong(i);
- System.out.println(appendHex(new StringBuilder(),b));
- System.out.println(readModifiedLong(b));
-
- }
-
- }
-}
+}
\ No newline at end of file
Deleted:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -1,142 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.MessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
- final ContentHeaderBody contentHeaderBody =
readContentHeaderBody(tupleInput);
- final int contentChunkCount = tupleInput.readInt();
- return new MessageMetaData(publishBody, contentHeaderBody,
contentChunkCount);
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- MessageMetaData message = (MessageMetaData) object;
- try
- {
- writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
- }
- catch (AMQException e)
- {
- // can't do anything else since the BDB interface precludes throwing any
exceptions
- // in practice we should never get an exception
- throw new RuntimeException("Error converting object to entry: " +
e, e);
- }
- writeContentHeader(message.getContentHeaderBody(), tupleOutput);
- tupleOutput.writeInt(message.getContentChunkCount());
- }
-
- private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
- {
-
- final AMQShortString exchange =
AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey =
AMQShortStringEncoding.readShortString(tupleInput);
- final boolean mandatory = tupleInput.readBoolean();
- final boolean immediate = tupleInput.readBoolean();
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- }
-
-
- private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws
AMQFrameDecodingException, AMQProtocolVersionException
- {
- int bodySize = tupleInput.readInt();
- byte[] underlying = new byte[bodySize];
- tupleInput.readFast(underlying);
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- return ContentHeaderBody.createFromBuffer(buf, bodySize);
- }
-
- private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput
tupleOutput) throws AMQException
- {
-
-
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(),
tupleOutput);
- tupleOutput.writeBoolean(publishBody.isMandatory());
- tupleOutput.writeBoolean(publishBody.isImmediate());
-
- }
-
- private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput
tupleOutput)
- {
- // write out the content header body
- final int bodySize = headerBody.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- headerBody.writePayload(buf);
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(underlying);
- }
-}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -1,8 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
import org.apache.qpid.framing.AMQShortString;
/**
@@ -10,55 +27,26 @@
*/
public class QueueEntryKey
{
- public AMQShortString queueName;
- public long messageId;
+ private AMQShortString _queueName;
+ private long _messageId;
- public QueueEntryKey()
+ public QueueEntryKey(AMQShortString queueName, long messageId)
{
+ _queueName = queueName;
+ _messageId = messageId;
}
- public QueueEntryKey(byte[] payload)
+ public AMQShortString getQueueName()
{
- final TupleInput ti = new TupleInput(payload);
-
- queueName = AMQShortStringEncoding.readShortString(ti);
-
- messageId = ti.readLong();
-
+ return _queueName;
}
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
- {
- public Object entryToObject(TupleInput tupleInput)
- {
- final QueueEntryKey mk = new QueueEntryKey();
-
- mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
- mk.messageId = tupleInput.readLong();
-
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final QueueEntryKey mk = (QueueEntryKey) object;
-
- AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
- tupleOutput.writeLong(mk.messageId);
-
- }
-
-
- }
-
- public QueueEntryKey(AMQShortString queueName, long messageId)
+ public long getMessageId()
{
- this.queueName = queueName;
- this.messageId = messageId;
+ return _messageId;
}
-
}
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_1 extends MessageContentKey
+{
+ private int _chunkNum;
+
+ public MessageContentKey_1(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _chunkNum = chunkNo;
+ }
+
+ public int getChunk()
+ {
+ return _chunkNum;
+ }
+
+ public void setChunk(int chunk)
+ {
+ this._chunkNum = chunk;
+ }
+}
\ No newline at end of file
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_3 extends MessageContentKey
+{
+ private int _offset;
+
+ public MessageContentKey_3(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _offset = chunkNo;
+ }
+
+ public int getOffset()
+ {
+ return _offset;
+ }
+
+ public void setOffset(int chunk)
+ {
+ this._offset = chunk;
+ }
+}
\ No newline at end of file
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeRecord extends Object
+{
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _exchangeType;
+ private final boolean _autoDelete;
+
+ public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType,
boolean autoDelete)
+ {
+ _exchangeName = exchangeName;
+ _exchangeType = exchangeType;
+ _autoDelete = autoDelete;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getType()
+ {
+ return _exchangeType;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+}
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueRecord extends Object
+{
+ private final AMQShortString _queueName;
+ private final AMQShortString _owner;
+ private final FieldTable _arguments;
+
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, FieldTable
arguments)
+ {
+ _queueName = queueName;
+ _owner = owner;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.framing.FieldTable;
-
public interface BindingTuple
{
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -21,13 +21,14 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
import com.sleepycat.bind.tuple.TupleBinding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+
public class BindingTupleBindingFactory extends TupleBindingFactory
{
- public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+ public BindingTupleBindingFactory(int version)
{
- super(version, virtualhost);
+ super(version);
}
public TupleBinding getInstance()
@@ -35,10 +36,12 @@
switch (_version)
{
default:
+ case 3:
+ //no change from v2
case 2:
- return new BindingTuple_2(_virtualhost);
+ return new BindingTuple_2();
case 1:
- return new BindingTuple_1(_virtualhost);
+ return new BindingTuple_1();
}
}
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -1,11 +1,8 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -16,15 +13,8 @@
{
protected static final Logger _log = Logger.getLogger(BindingTuple.class);
- protected VirtualHost _virtualhost;
-
- public BindingTuple_1(VirtualHost virtualHost)
+ public BindingTuple_1()
{
- if (virtualHost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
- _virtualhost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -33,7 +23,7 @@
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- return createNewBindingKey(exchangeName, queueName, routingKey);
+ return new BindingKey(exchangeName, queueName, routingKey, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
@@ -45,16 +35,4 @@
AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
}
- private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName, AMQShortString routingKey)
- {
- return createNewBindingKey(exchangeName, queueName, routingKey, null);
- }
-
- // Addition for Version 2 of this table
- protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString
queueName,
- AMQShortString routingKey, FieldTable
arguments)
- {
- return new BindingKey(exchangeName, queueName, routingKey, arguments);
- }
-
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -1,24 +1,21 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
{
- public BindingTuple_2(VirtualHost virtualHost)
+ public BindingTuple_2()
{
- super(virtualHost);
+ super();
}
public Object entryToObject(TupleInput tupleInput)
@@ -28,7 +25,7 @@
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
FieldTable arguments;
-
+
// Addition for Version 2 of this table
try
{
@@ -40,7 +37,7 @@
return null;
}
- return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
+ return new BindingKey(exchangeName, queueName, routingKey, arguments);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_1 extends com.sleepycat.bind.tuple.TupleBinding
+{
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int chunk = tupleInput.readInt();
+
+ return new MessageContentKey_1(messageId, chunk);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_1 mk = (MessageContentKey_1) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getChunk());
+ }
+
+}
\ No newline at end of file
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_3 extends com.sleepycat.bind.tuple.TupleBinding
+{
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int offset = tupleInput.readInt();
+
+ return new MessageContentKey_3(messageId, offset);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_3 mk = (MessageContentKey_3) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getOffset());
+ }
+
+}
\ No newline at end of file
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory
+{
+ public MessageContentKeyTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 3:
+ return new MessageContentKeyTB_3();
+ case 2:
+ //no change from v1
+ case 1:
+ return new MessageContentKeyTB_1();
+ }
+ }
+}
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import
org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_1 extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_1.class);
+
+ public MessageMetaDataTB_1()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+ final ContentHeaderBody contentHeaderBody =
readContentHeaderBody(tupleInput);
+ final int contentChunkCount = tupleInput.readInt();
+
+ return new MessageMetaData(publishBody, contentHeaderBody,
contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ MessageMetaData message = (MessageMetaData) object;
+ try
+ {
+ writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+ }
+ catch (AMQException e)
+ {
+ // can't do anything else since the BDB interface precludes throwing any
exceptions
+ // in practice we should never get an exception
+ throw new RuntimeException("Error converting object to entry: " +
e, e);
+ }
+ writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+ tupleOutput.writeInt(message.getContentChunkCount());
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange =
AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey =
AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ }
+
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws
AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+ return ContentHeaderBody.createFromBuffer(buf, bodySize);
+ }
+
+ private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput
tupleOutput) throws AMQException
+ {
+
+
+ AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(),
tupleOutput);
+ tupleOutput.writeBoolean(publishBody.isMandatory());
+ tupleOutput.writeBoolean(publishBody.isImmediate());
+
+ }
+
+ private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput
tupleOutput)
+ {
+ // write out the content header body
+ final int bodySize = headerBody.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ headerBody.writePayload(buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_3 extends MessageMetaDataTB_1
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_3.class);
+
+ @Override
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final int bodySize = tupleInput.readInt();
+ byte[] dataAsBytes = new byte[bodySize];
+ tupleInput.readFast(dataAsBytes);
+
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ @Override
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory
+{
+ public MessageMetaDataTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 3:
+ return new MessageMetaDataTB_3();
+ case 2:
+ //no change from v1
+ case 1:
+ return new MessageMetaDataTB_1();
+ }
+ }
+}
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
(from rev 3969,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding
+{
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ Long messageId = tupleInput.readLong();
+
+ return new QueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final QueueEntryKey mk = (QueueEntryKey) object;
+
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+}
\ No newline at end of file
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.framing.FieldTable;
-
public interface QueueTuple
{
- // Addition for Version 2
- public void setArguments(FieldTable arguments);
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import com.sleepycat.bind.tuple.TupleBinding;
public class QueueTupleBindingFactory extends TupleBindingFactory
{
- public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+
+ public QueueTupleBindingFactory(int version)
{
- super(version,virtualHost);
+ super(version);
}
public TupleBinding getInstance()
@@ -35,10 +35,12 @@
switch (_version)
{
default:
+ case 3:
+ //no change from v2
case 2:
- return new QueueTuple_2(_virtualhost);
+ return new QueueTuple_2();
case 1:
- return new QueueTuple_1(_virtualhost);
+ return new QueueTuple_1();
}
}
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,28 +20,15 @@
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
public class QueueTuple_1 extends TupleBinding implements QueueTuple
{
- protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
- protected final VirtualHost _virtualHost;
-
- public QueueTuple_1(VirtualHost virtualHost)
+ public QueueTuple_1()
{
- if (virtualHost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
- _virtualHost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -49,58 +36,15 @@
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- return createNewQueue(name, owner);
+ return new QueueRecord(name, owner, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
+ QueueRecord queue = (QueueRecord) object;
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(),
tupleOutput);
AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ //Version1 tuple can't store the queue arguments.
}
-
- // Addition for Version 2 of this table
- public void setArguments(FieldTable arguments)
- {
- //no-op
- }
-
- protected Object createNewQueue(AMQShortString name, AMQShortString owner)
- {
- return createNewQueue(name, owner, null);
- }
-
- // Addition for Version 2 of this table
- protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable
arguments)
- {
- try
- {
-
- AMQQueue queue;
- AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
-
- if (existing != null)
- {
- _logger.info("Queue :" + existing + ": already exists in
configured broker.");
-
- queue = existing;
- }
- else
- {
- // Retrieve the existing Queue object
- queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false,
_virtualHost, arguments);
- _virtualHost.getQueueRegistry().registerQueue(queue);
- _logger.info("Recovering queue " + queue.getName() + "
with owner:"
- + ((queue.getOwner() == null) ? " <<null>>
" : (" \"" + queue.getOwner() + "\" ")));
- }
-
- return queue;
- }
- catch (AMQException e)
- {
- _logger.error("Unable to create queue: " + e, e);
- return null;
- }
- }
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -20,22 +20,23 @@
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
public class QueueTuple_2 extends QueueTuple_1
{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_2.class);
+
protected FieldTable _arguments;
- public QueueTuple_2(VirtualHost virtualHost)
+ public QueueTuple_2()
{
- super(virtualHost);
+ super();
}
public Object entryToObject(TupleInput tupleInput)
@@ -44,10 +45,10 @@
{
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table
+ // Addition for Version 2 of this table, read the queue arguments
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- return createNewQueue(name, owner, arguments);
+ return new QueueRecord(name, owner, arguments);
}
catch (DatabaseException e)
{
@@ -59,17 +60,11 @@
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
+ QueueRecord queue = (QueueRecord) object;
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(),
tupleOutput);
AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table
- FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
}
-
- // Addition for Version 2 of this table
- public void setArguments(FieldTable arguments)
- {
- _arguments = arguments;
- }
}
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -27,17 +27,9 @@
{
protected int _version;
- protected VirtualHost _virtualhost;
-
- public TupleBindingFactory(int version, VirtualHost virtualhost)
+ public TupleBindingFactory(int version)
{
- if (virtualhost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
-
_version = version;
- _virtualhost = virtualhost;
}
public abstract TupleBinding getInstance();
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.util.FileUtils;
@@ -200,7 +201,7 @@
testVirtualhost.setProperty("store.version", version);
ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
- registerVirtualHost(new VirtualHost(new
VirtualHostConfiguration("bdbtest",testVirtualhost)));
+ registerVirtualHost(new VirtualHostImpl(new
VirtualHostConfiguration("bdbtest",testVirtualhost), null /* TODO */));
TransportConnection.createVMBroker(port);
Modified:
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-05-13
18:28:33 UTC (rev 3970)
+++
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-05-14
16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -341,12 +342,12 @@
CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
// Note the name of the Vhosts is not important, the store doesnot record the
name of the vhost.
- _newVirtualHost = new VirtualHost(new
VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new
MemoryMessageStore());
- _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old",
new PropertiesConfiguration()), new MemoryMessageStore());
+ _newVirtualHost = new VirtualHostImpl(new
VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new
MemoryMessageStore());
+ _oldVirtualHost = new VirtualHostImpl(new
VirtualHostConfiguration("Old", new PropertiesConfiguration()), new
MemoryMessageStore());
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
- _newMessageStore.configure(_newVirtualHost, toDir, false);
+ //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
try
{
@@ -356,7 +357,7 @@
default:
case 1:
_oldMessageStore = new BDBMessageStore(1);
- _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
_oldMessageStore.start();
upgradeFromVersion_1();
break;