[rhmessaging-commits] rhmessaging commits: r2368 - in store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb: tuples and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Sep 2 10:30:22 EDT 2008
Author: ritchiem
Date: 2008-09-02 10:30:22 -0400 (Tue, 02 Sep 2008)
New Revision: 2368
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
Removed:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
RHM-5 : Created a version aware tuples to load the two database formats we now have
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -43,9 +43,13 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.mina.common.TrafficMask;
import java.io.File;
import java.util.ArrayList;
@@ -73,8 +77,12 @@
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
- private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+ private static final int DATABASE_FORMAT_VERSION = 2;
+ private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
+
+ public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
private Environment _environment;
private static final String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
@@ -123,10 +131,19 @@
private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
+ // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
+
+ private QueueTupleBindingFactory _queueTupleBindingFactory;
+ private BindingTupleBindingFactory _bindingTupleBindingFactory;
+
+ /** The data version this store should run with */
+ private int _version;
+
private enum State
{
INITIAL,
CONFIGURING,
+ CONFIGURED,
RECOVERING,
STARTED,
CLOSING,
@@ -135,6 +152,16 @@
private State _state = State.INITIAL;
+ public BDBMessageStore()
+ {
+ this(DATABASE_FORMAT_VERSION);
+ }
+
+ public BDBMessageStore(int version)
+ {
+ _version = version;
+ }
+
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
@@ -150,10 +177,6 @@
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
- stateTransition(State.INITIAL, State.CONFIGURING);
-
- _log.info("Configuring BDB message store");
-
File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
if (!environmentPath.exists())
{
@@ -164,24 +187,60 @@
}
}
- createEnvironment(environmentPath);
- openDatabases();
- _virtualHost = virtualHost;
+ _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, 2);
- _commitThread.start();
+ configure(virtualHost, environmentPath);
+ }
- upgradeIfNecessary();
+ public void configure(File environmentPath) throws AMQException, DatabaseException
+ {
+ configure(null, environmentPath);
+ }
+ public void configure(VirtualHost virtualHost, File environmentPath) throws AMQException, DatabaseException
+ {
+ stateTransition(State.INITIAL, State.CONFIGURING);
+
+ _log.info("Configuring BDB message store");
+
+ if (virtualHost != null)
+ {
+ setVirtualHost(virtualHost);
+ createTupleBindingFactories(_version);
+ }
+
+ setupStore(environmentPath);
+
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
+
// this recovers durable queues and persistent messages
- recover();
+ if (virtualHost != null)
+ {
+ recover();
+ }
+ }
- stateTransition(State.RECOVERING, State.STARTED);
+ private void setupStore(File storePath) throws DatabaseException, AMQException
+ {
+ checkState(State.CONFIGURING);
+
+ createEnvironment(storePath);
+
+ openDatabases();
+
+ _commitThread.start();
}
- private void upgradeIfNecessary()
+ protected void startStore() throws AMQException
{
+ stateTransition(State.CONFIGURED, State.STARTED);
+ }
+ private void createTupleBindingFactories(int version)
+ {
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
}
private synchronized void stateTransition(State requiredState, State newState) throws AMQException
@@ -203,7 +262,7 @@
}
}
- void createEnvironment(File environmentPath) throws DatabaseException
+ private void createEnvironment(File environmentPath) throws DatabaseException
{
_log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
EnvironmentConfig envConfig = new EnvironmentConfig();
@@ -213,7 +272,7 @@
_environment = new Environment(environmentPath, envConfig);
}
- protected void openDatabases() throws DatabaseException
+ private void openDatabases() throws DatabaseException
{
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
@@ -493,16 +552,15 @@
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- BindingTB binding = new BindingTB(_virtualHost);
-
BindingKey queueBinding =
new BindingKey(exchange.getName(), null, null, null);
- EntryBinding keyBinding = new BindingTB(_virtualHost);
+ EntryBinding<BindingKey> keyBinding = _bindingTupleBindingFactory.getInstance();
keyBinding.objectToEntry(queueBinding, key);
OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
+ TupleBinding binding = _bindingTupleBindingFactory.getInstance();
while (opStatus == OperationStatus.SUCCESS)
{
queueBinding = (BindingKey) binding.entryToObject(key);
@@ -538,7 +596,7 @@
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- ExchangeTB binding = new ExchangeTB(_virtualHost);
+ TupleBinding binding = new ExchangeTB(_virtualHost);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -579,8 +637,9 @@
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new BindingTB(_virtualHost);
+ EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
@@ -610,7 +669,7 @@
throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new BindingTB(_virtualHost);
+ EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
try
@@ -652,10 +711,15 @@
_queueNameToIdMap.put(queue.getName(), queueId);
DatabaseEntry key = new DatabaseEntry();
+
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(queue.getName(), key);
+
DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = new QueueTB(_virtualHost, arguments);
+ TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+
+ ((QueueTuple) queueBinding).setArguments(arguments);
+
queueBinding.objectToEntry(queue, value);
try
{
@@ -718,7 +782,7 @@
try
{
_queueDb.get(null, key, value, LockMode.RMW);
- QueueTB binding = new QueueTB(_virtualHost, null);
+ TupleBinding binding = _queueTupleBindingFactory.getInstance();
return (AMQQueue) binding.entryToObject(value);
}
@@ -1038,7 +1102,7 @@
public void recover() throws AMQException
{
- stateTransition(State.CONFIGURING, State.RECOVERING);
+ stateTransition(State.CONFIGURED, State.RECOVERING);
_log.info("Recovering persistent state...");
StoreContext context = new StoreContext();
@@ -1057,8 +1121,16 @@
catch (DatabaseException e)
{
abortTran(context);
+
throw new AMQException("Error recovering persistent state: " + e, e);
}
+ catch (Throwable ioobe)
+ {
+ abortTran(context);
+ throw new AMQException("Invalid database format. Please use upgrade tool.", ioobe);
+ }
+
+ stateTransition(State.RECOVERING, State.STARTED);
}
/**
@@ -1254,14 +1326,17 @@
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- QueueTB binding = new QueueTB(_virtualHost, null);
+ TupleBinding binding = _queueTupleBindingFactory.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
AMQQueue queue = (AMQQueue) binding.entryToObject(value);
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queue.getName(), queue);
- _log.info("Recovering queue " + queue.getName() + " with owner:"
- + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+ if (queue != null)
+ {
+ _virtualHost.getQueueRegistry().registerQueue(queue);
+ queues.put(queue.getName(), queue);
+ _log.info("Recovering queue " + queue.getName() + " with owner:"
+ + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+ }
}
return queues;
@@ -1275,6 +1350,117 @@
}
}
+ //public getters for the TupleBindingFactories
+
+ public QueueTupleBindingFactory getQueueTupleBindingFactory()
+ {
+ return _queueTupleBindingFactory;
+ }
+
+ public BindingTupleBindingFactory getBindingTupleBindingFactory()
+ {
+ return _bindingTupleBindingFactory;
+ }
+
+ //Package getters for the various databases used by the Store
+
+ Database getMetaDataDb()
+ {
+ return _messageMetaDataDb;
+ }
+
+ Database getContentDb()
+ {
+ return _messageContentDb;
+ }
+
+ Database getQueuesDb()
+ {
+ return _queueDb;
+ }
+
+ Database getDeliveryDb()
+ {
+ return _deliveryDb;
+ }
+
+ Database getExchangesDb()
+ {
+ return _exchangeDb;
+ }
+
+ Database getBindingsDb()
+ {
+ return _queueBindingsDb;
+ }
+
+ /** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
+ interface DatabaseVisitor
+ {
+ public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQException, DatabaseException;
+ }
+
+ void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_messageMetaDataDb, visitor);
+ }
+
+ void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_messageContentDb, visitor);
+ }
+
+ void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_queueDb, visitor);
+ }
+
+ void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_deliveryDb, visitor);
+ }
+
+ void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_exchangeDb, visitor);
+ }
+
+ void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ visitDatabase(_queueBindingsDb, visitor);
+ }
+
+ /**
+ * Generic visitDatabase allows iteration through the specified database.
+ *
+ * @param database The database to visit
+ * @param visitor The visitor to give each entry to.
+ *
+ * @throws DatabaseException If there is a problem with the Database structure
+ * @throws AMQException If there is a programming error
+ */
+ void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQException
+ {
+ Cursor cursor = database.openCursor(null, null);
+
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ visitor.visit(key, value);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
private static final class ProcessAction
{
private final AMQQueue _queue;
@@ -1402,6 +1588,8 @@
void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
+
+ createTupleBindingFactories(_version);
}
void commit(Transaction tx) throws DatabaseException
@@ -1623,4 +1811,5 @@
}
}
}
+
}
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java 2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -1,59 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-public class BindingTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(BindingTB.class);
-
- private final VirtualHost _virtualHost;
-
- public BindingTB(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
- return new BindingKey(exchangeName, queueName, routingKey, arguments);
- }
- catch (DatabaseException e)
- {
- _log.error("Unable to create binding: " + e, e);
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- BindingKey binding = (BindingKey) object;
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
- FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
-
- binding = (BindingKey) entryToObject(new TupleInput(tupleOutput.getBufferBytes()));
-
- System.err.println(binding.getExchangeName());
- System.err.println(binding.getQueueName());
- System.err.println(binding.getRoutingKey());
- System.err.println(binding.getArguments());
-
- }
-}
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java 2008-08-29 14:36:01 UTC (rev 2367)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -1,79 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.log4j.Logger;
-
-public class QueueTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(QueueTB.class);
-
- private final VirtualHost _virtualHost;
- private final FieldTable _arguments;
-
- public QueueTB(VirtualHost virtualHost, FieldTable arguments)
- {
- _virtualHost = virtualHost;
- _arguments = arguments;
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
- try
- {
- return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
- }
- catch (AMQException e)
- {
- _log.error("Unable to create queue: " + e, e);
- return null;
- }
- }
- catch (DatabaseException e)
- {
- _log.error("Unable to create binding: " + e, e);
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQQueue queue = (AMQQueue) object;
-
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
-
- }
-}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.FieldTable;
+
+public interface BindingTuple
+{
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class BindingTupleBindingFactory extends TupleBindingFactory
+{
+ public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+ {
+ super(version, virtualhost);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 2:
+ return new BindingTuple_2(_virtualhost);
+ case 1:
+ return new BindingTuple_1(_virtualhost);
+ }
+ }
+}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java (from rev 2292, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,60 @@
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+
+public class BindingTuple_1 extends TupleBinding implements BindingTuple
+{
+ protected static final Logger _log = Logger.getLogger(BindingTuple.class);
+
+ protected VirtualHost _virtualhost;
+
+ public BindingTuple_1(VirtualHost virtualHost)
+ {
+ if (virtualHost == null)
+ {
+ throw new NullPointerException("Virtualhost cannot be null");
+ }
+ _virtualhost = virtualHost;
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+ return createNewBindingKey(exchangeName, queueName, routingKey);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ BindingKey binding = (BindingKey) object;
+
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+ }
+
+ private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
+ {
+ return createNewBindingKey(exchangeName, queueName, routingKey, null);
+ }
+
+ // Addition for Version 2 of this table
+ protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
+ AMQShortString routingKey, FieldTable arguments)
+ {
+ return new BindingKey(exchangeName, queueName, routingKey, arguments);
+ }
+
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,58 @@
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.log4j.Logger;
+
+public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
+{
+
+ public BindingTuple_2(VirtualHost virtualHost)
+ {
+ super(virtualHost);
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+
+ FieldTable arguments;
+
+ // Addition for Version 2 of this table
+ try
+ {
+ arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ BindingKey binding = (BindingKey) object;
+
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
+
+ // Addition for Version 2 of this table
+ FieldTableEncoding.writeFieldTable(binding.getArguments(), tupleOutput);
+ }
+
+}
\ No newline at end of file
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.FieldTable;
+
+public interface QueueTuple
+{
+ // Addition for Version 2
+ public void setArguments(FieldTable arguments);
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class QueueTupleBindingFactory extends TupleBindingFactory
+{
+ public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+ {
+ super(version,virtualHost);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 2:
+ return new QueueTuple_2(_virtualhost);
+ case 1:
+ return new QueueTuple_1(_virtualhost);
+ }
+ }
+}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java (from rev 2292, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.log4j.Logger;
+
+public class QueueTuple_1 extends TupleBinding implements QueueTuple
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
+
+ protected final VirtualHost _virtualHost;
+
+ public QueueTuple_1(VirtualHost virtualHost)
+ {
+ if (virtualHost == null)
+ {
+ throw new NullPointerException("Virtualhost cannot be null");
+ }
+ _virtualHost = virtualHost;
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+
+ return createNewQueue(name, owner);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ AMQQueue queue = (AMQQueue) object;
+
+ AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ }
+
+ // Addition for Version 2 of this table
+ public void setArguments(FieldTable arguments)
+ {
+ //no-op
+ }
+
+ protected Object createNewQueue(AMQShortString name, AMQShortString owner)
+ {
+ return createNewQueue(name, owner, null);
+ }
+
+ // Addition for Version 2 of this table
+ protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
+ {
+ try
+ {
+ return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to create queue: " + e, e);
+ return null;
+ }
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
___________________________________________________________________
Name: svn:eol-style
+ native
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_2 extends QueueTuple_1
+{
+ protected FieldTable _arguments;
+
+ public QueueTuple_2(VirtualHost virtualHost)
+ {
+ super(virtualHost);
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ return createNewQueue(name, owner, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ AMQQueue queue = (AMQQueue) object;
+
+ AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table
+ FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
+ }
+
+ // Addition for Version 2 of this table
+ public void setArguments(FieldTable arguments)
+ {
+ _arguments = arguments;
+ }
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java 2008-09-02 14:30:22 UTC (rev 2368)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public abstract class TupleBindingFactory
+{
+ protected int _version;
+
+ protected VirtualHost _virtualhost;
+
+ public TupleBindingFactory(int version, VirtualHost virtualhost)
+ {
+ if (virtualhost == null)
+ {
+ throw new NullPointerException("Virtualhost cannot be null");
+ }
+
+ _version = version;
+ _virtualhost = virtualhost;
+ }
+
+ public abstract TupleBinding getInstance();
+}
More information about the rhmessaging-commits
mailing list