[rhmessaging-commits] rhmessaging commits: r2086 - store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu May 29 12:22:36 EDT 2008
Author: godfrer
Date: 2008-05-29 12:22:36 -0400 (Thu, 29 May 2008)
New Revision: 2086
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
Removed:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
refactoring
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,57 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-public class AMQExchangeTupleBinding extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQExchangeTupleBinding.class);
-
-
-
- private final VirtualHost _virtualHost;
-
- public AMQExchangeTupleBinding(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
-
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
-
-
- boolean autoDelete = tupleInput.readBoolean();
-
- try
- {
- return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
- }
- catch (AMQException e)
- {
- _log.error("Unable to create exchange: " + e, e);
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- Exchange exchange = (Exchange) object;
-
-
- AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
-
- tupleOutput.writeBoolean(exchange.isAutoDelete());
-
- }
-}
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,59 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
-
-public class AMQQueueBindingTupleBinding extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQQueueBindingTupleBinding.class);
-
-
-
- private final VirtualHost _virtualHost;
-
- public AMQQueueBindingTupleBinding(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
-
-
-
- return new QueueBindingKey(exchangeName,queueName,routingKey,arguments);
- }
- catch (DatabaseException e)
- {
- _log.error("Unable to create binding: " + e, e);
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- QueueBindingKey binding = (QueueBindingKey) object;
-
-
- AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
- FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
-
- }
-}
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,68 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.log4j.Logger;
-
-public class AMQQueueTupleBinding extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQQueueTupleBinding.class);
-
-
-
- private final VirtualHost _virtualHost;
-
- public AMQQueueTupleBinding(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
- AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
-
-
- try
- {
- return new AMQQueue(name, true, owner, false, _virtualHost);
- }
- catch (AMQException e)
- {
- _log.error("Unable to create queue: " + e, e);
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQQueue queue = (AMQQueue) object;
-
-
- AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
-
- }
-}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,28 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQShortStringTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
+
+
+ public AMQShortStringTB()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ return AMQShortStringEncoding.readShortString(tupleInput);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
+ }
+
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,28 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
-
-public class AMQShortStringTupleBinding extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(AMQShortStringTupleBinding.class);
-
-
- public AMQShortStringTupleBinding()
- {
- }
-
- public Object entryToObject(TupleInput tupleInput)
- {
- return AMQShortStringEncoding.readShortString(tupleInput);
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
- }
-
-}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -56,6 +56,7 @@
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -90,27 +91,43 @@
private static final String QUEUEDB_NAME = "queueDb";
+ private static final String NEW_QUEUE_DB_NAME = "QUEUE";
+
/** Maps from name (which uniquely identifies a queue) to an AMQQueue */
private Database _queueDb;
private static final String DELIVERYDB_NAME = "deliveryDb";
+ private static final String QUEUE_ENTRY_DB_NAME = "QUEUE_ENTRY";
+
/** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
private Database _deliveryDb;
private static final String EXCHANGEDB_NAME = "exchangeDb";
private Database _exchangeDb;
+ private static final String NEW_EXCHANGE_DB_NAME = "EXCHANGE";
+
+
private static final String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
private Database _queueBindingsDb;
private VirtualHost _virtualHost;
private final AtomicLong _messageId = new AtomicLong(1);
+
+ private final AtomicLong _queueId = new AtomicLong(1);
+
private static final AMQShortString EMPTY_SHORT_STRING = new AMQShortString("");
+
+
+
+
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
+ private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
+
private enum State
{
INITIAL,
@@ -159,6 +176,9 @@
_commitThread.start();
+
+ upgradeIfNecessary();
+
// this recovers durable queues and persistent messages
recover();
@@ -166,6 +186,11 @@
stateTransition(State.RECOVERING, State.STARTED);
}
+ private void upgradeIfNecessary()
+ {
+
+ }
+
private synchronized void stateTransition(State requiredState, State newState) throws AMQException
{
if (_state != requiredState)
@@ -389,10 +414,10 @@
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTupleBinding();
+ EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(exchange.getName(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new AMQExchangeTupleBinding(_virtualHost);
+ TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
exchangeBinding.objectToEntry(exchange, value);
try
{
@@ -415,7 +440,7 @@
public void removeExchange(Exchange exchange) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTupleBinding();
+ EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(exchange.getName(), key);
try
{
@@ -445,7 +470,7 @@
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- for (QueueBindingKey binding : loadQueueBindings(exchange))
+ for (BindingKey binding : loadQueueBindings(exchange))
{
AMQQueue queue = queueRegistry.getQueue(binding.getQueueName());
if (queue == null)
@@ -464,29 +489,29 @@
}
}
- private List<QueueBindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
+ private List<BindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
{
Cursor cursor = null;
- List<QueueBindingKey> queueBindings = new ArrayList<QueueBindingKey>();
+ List<BindingKey> queueBindings = new ArrayList<BindingKey>();
try
{
cursor = _queueBindingsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- AMQQueueBindingTupleBinding binding = new AMQQueueBindingTupleBinding(_virtualHost);
+ BindingTB binding = new BindingTB(_virtualHost);
- QueueBindingKey queueBinding =
- new QueueBindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
+ BindingKey queueBinding =
+ new BindingKey(exchange.getName(), new AMQShortString(""), new AMQShortString(""), null);
- EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
+ EntryBinding keyBinding = new BindingTB(_virtualHost);
keyBinding.objectToEntry(queueBinding, key);
OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
while ((opStatus == OperationStatus.SUCCESS)
- && ((queueBinding = (QueueBindingKey) binding.entryToObject(key)).getExchangeName().equals(
+ && ((queueBinding = (BindingKey) binding.entryToObject(key)).getExchangeName().equals(
exchange.getName())))
{
queueBindings.add(queueBinding);
@@ -514,7 +539,7 @@
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- AMQExchangeTupleBinding binding = new AMQExchangeTupleBinding(_virtualHost);
+ ExchangeTB binding = new ExchangeTB(_virtualHost);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
Exchange exchange = (Exchange) binding.entryToObject(value);
@@ -555,8 +580,8 @@
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
- keyBinding.objectToEntry(new QueueBindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+ EntryBinding keyBinding = new BindingTB(_virtualHost);
+ keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
@@ -586,8 +611,8 @@
throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQQueueBindingTupleBinding(_virtualHost);
- keyBinding.objectToEntry(new QueueBindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+ EntryBinding keyBinding = new BindingTB(_virtualHost);
+ keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
try
{
@@ -618,11 +643,14 @@
if (_state != State.RECOVERING)
{
+ long queueId = _queueId.getAndIncrement();
+ _queueNameToIdMap.put(queue.getName(),queueId);
+
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTupleBinding();
+ EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(queue.getName(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = new AMQQueueTupleBinding(_virtualHost);
+ TupleBinding queueBinding = new QueueTB(_virtualHost);
queueBinding.objectToEntry(queue, value);
try
{
@@ -647,9 +675,10 @@
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ Long queueId = _queueNameToIdMap.remove(name);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTupleBinding();
+ EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
try
{
@@ -677,13 +706,13 @@
AMQQueue getQueue(AMQShortString name) throws AMQException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTupleBinding();
+ EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
DatabaseEntry value = new DatabaseEntry();
try
{
_queueDb.get(null, key, value, LockMode.RMW);
- AMQQueueTupleBinding binding = new AMQQueueTupleBinding(_virtualHost);
+ QueueTB binding = new QueueTB(_virtualHost);
return (AMQQueue) binding.entryToObject(value);
}
@@ -710,8 +739,8 @@
AMQShortString name = queue.getName();
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
- DeliveryDetailsKey dd = new DeliveryDetailsKey(name, messageId);
+ EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
@@ -769,8 +798,8 @@
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
- DeliveryDetailsKey dd = new DeliveryDetailsKey(name, messageId);
+ EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
@@ -955,9 +984,9 @@
DatabaseEntry key = new DatabaseEntry();
- DeliveryDetailsKey dd = new DeliveryDetailsKey(queueName, 0);
+ QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -965,7 +994,7 @@
LinkedList<Long> messageIds = new LinkedList<Long>();
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
while ((status == OperationStatus.SUCCESS) && dd.queueName.equals(queueName))
{
@@ -974,7 +1003,7 @@
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+ dd = (QueueEntryKey) keyBinding.entryToObject(key);
}
}
@@ -1055,7 +1084,7 @@
TupleBinding keyBinding = new MessageContentKey.TupleBinding();
keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageContentTupleBinding();
+ TupleBinding messageBinding = new ContentTB();
messageBinding.objectToEntry(contentBody, value);
try
{
@@ -1100,7 +1129,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTupleBinding();
+ TupleBinding messageBinding = new MessageMetaDataTB();
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1138,7 +1167,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTupleBinding();
+ TupleBinding messageBinding = new MessageMetaDataTB();
try
{
@@ -1177,7 +1206,7 @@
TupleBinding keyBinding = new MessageContentKey.TupleBinding();
keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageContentTupleBinding();
+ TupleBinding messageBinding = new ContentTB();
if (_log.isDebugEnabled())
{
_log.debug("Message Id: " + messageId + " Getting content body chunk: " + index);
@@ -1212,7 +1241,7 @@
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- AMQQueueTupleBinding binding = new AMQQueueTupleBinding(_virtualHost);
+ QueueTB binding = new QueueTB(_virtualHost);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
AMQQueue queue = (AMQQueue) binding.entryToObject(value);
@@ -1267,7 +1296,7 @@
Transaction tx = (Transaction) context.getPayload();
cursor = _deliveryDb.openCursor(tx, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
@@ -1279,7 +1308,7 @@
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- DeliveryDetailsKey dd = (DeliveryDetailsKey) keyBinding.entryToObject(key);
+ QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
AMQShortString queueName = dd.queueName;
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,49 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 19-Feb-2007
+ * Time: 14:11:01
+ * To change this template use File | Settings | File Templates.
+ */
+public class BindingKey extends Object
+{
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _queueName;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
+
+ public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
+ {
+ _exchangeName = exchangeName;
+ _queueName = queueName;
+ _routingKey = routingKey;
+ _arguments = arguments;
+ }
+
+
+ public AMQShortString getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getQueueName()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueBindingTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,57 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+import org.apache.log4j.Logger;
+
+public class BindingTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(BindingTB.class);
+
+
+
+ private final VirtualHost _virtualHost;
+
+ public BindingTB(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+
+
+ return new BindingKey(exchangeName,queueName,routingKey,arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Unable to create binding: " + e, e);
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ BindingKey binding = (BindingKey) object;
+
+
+ AMQShortStringEncoding.writeShortString(binding.getExchangeName(),tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getQueueName(),tupleOutput);
+ AMQShortStringEncoding.writeShortString(binding.getRoutingKey(),tupleOutput);
+ FieldTableEncoding.writeFieldTable(binding.getArguments(),tupleOutput);
+
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * @author Robert Greig (robert.j.greig at jpmorgan.com)
+ */
+public class ContentTB extends TupleBinding
+{
+ public Object entryToObject(TupleInput tupleInput)
+ {
+
+ final int size = tupleInput.readInt();
+ byte[] underlying = new byte[size];
+ tupleInput.readFast(underlying);
+ final ByteBuffer data = ByteBuffer.wrap(underlying);
+ ContentChunk cb = new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return size;
+ }
+
+ public ByteBuffer getData()
+ {
+ return data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ };
+ return cb;
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ ContentChunk cb = (ContentChunk) object;
+ final int size = cb.getSize();
+ byte[] underlying = new byte[size];
+
+ ByteBuffer buf = cb.getData();
+
+ buf.duplicate().rewind().get(underlying);
+
+ tupleOutput.writeInt(size);
+ tupleOutput.writeFast(underlying);
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,64 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * @author Apache Software Foundation
- */
-public class DeliveryDetailsKey
-{
- public AMQShortString queueName;
- public long messageId;
-
-
- public DeliveryDetailsKey()
- {
- }
-
-
- public DeliveryDetailsKey(byte[] payload)
- {
- final TupleInput ti = new TupleInput(payload);
-
- queueName = AMQShortStringEncoding.readShortString(ti);
-
- messageId = ti.readLong();
-
- }
-
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
- {
- public Object entryToObject(TupleInput tupleInput)
- {
- final DeliveryDetailsKey mk = new DeliveryDetailsKey();
-
-
- mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
- mk.messageId = tupleInput.readLong();
-
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final DeliveryDetailsKey mk = (DeliveryDetailsKey) object;
-
- AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
- tupleOutput.writeLong(mk.messageId);
-
- }
-
-
- }
-
- public DeliveryDetailsKey(AMQShortString queueName, long messageId)
- {
- this.queueName = queueName;
- this.messageId = messageId;
- }
-
-
-}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQExchangeTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,57 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+
+public class ExchangeTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(ExchangeTB.class);
+
+
+
+ private final VirtualHost _virtualHost;
+
+ public ExchangeTB(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
+
+
+ boolean autoDelete = tupleInput.readBoolean();
+
+ try
+ {
+ return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to create exchange: " + e, e);
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ Exchange exchange = (Exchange) object;
+
+
+ AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
+
+ tupleOutput.writeBoolean(exchange.isAutoDelete());
+
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-/**
- * @author Robert Greig (robert.j.greig at jpmorgan.com)
- */
-public class MessageContentTupleBinding extends TupleBinding
-{
- public Object entryToObject(TupleInput tupleInput)
- {
-
- final int size = tupleInput.readInt();
- byte[] underlying = new byte[size];
- tupleInput.readFast(underlying);
- final ByteBuffer data = ByteBuffer.wrap(underlying);
- ContentChunk cb = new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return data;
- }
-
- public void reduceToFit()
- {
-
- }
- };
- return cb;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- ContentChunk cb = (ContentChunk) object;
- final int size = cb.getSize();
- byte[] underlying = new byte[size];
-
- ByteBuffer buf = cb.getData();
-
- buf.duplicate().rewind().get(underlying);
-
- tupleOutput.writeInt(size);
- tupleOutput.writeFast(underlying);
- }
-}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.MessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+ final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+ final int contentChunkCount = tupleInput.readInt();
+ return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ MessageMetaData message = (MessageMetaData) object;
+ try
+ {
+ writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+ }
+ catch (AMQException e)
+ {
+ // can't do anything else since the BDB interface precludes throwing any exceptions
+ // in practice we should never get an exception
+ throw new RuntimeException("Error converting object to entry: " + e, e);
+ }
+ writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+ tupleOutput.writeInt(message.getContentChunkCount());
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ }
+
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+ return ContentHeaderBody.createFromBuffer(buf, bodySize);
+ }
+
+ private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+ {
+
+
+ AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+ tupleOutput.writeBoolean(publishBody.isMandatory());
+ tupleOutput.writeBoolean(publishBody.isImmediate());
+
+ }
+
+ private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+ {
+ // write out the content header body
+ final int bodySize = headerBody.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ headerBody.writePayload(buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTupleBinding.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,142 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.MessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTupleBinding extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTupleBinding.class);
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
- final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
- final int contentChunkCount = tupleInput.readInt();
- return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- MessageMetaData message = (MessageMetaData) object;
- try
- {
- writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
- }
- catch (AMQException e)
- {
- // can't do anything else since the BDB interface precludes throwing any exceptions
- // in practice we should never get an exception
- throw new RuntimeException("Error converting object to entry: " + e, e);
- }
- writeContentHeader(message.getContentHeaderBody(), tupleOutput);
- tupleOutput.writeInt(message.getContentChunkCount());
- }
-
- private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
- {
-
- final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- final boolean mandatory = tupleInput.readBoolean();
- final boolean immediate = tupleInput.readBoolean();
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- }
-
-
- private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- int bodySize = tupleInput.readInt();
- byte[] underlying = new byte[bodySize];
- tupleInput.readFast(underlying);
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- return ContentHeaderBody.createFromBuffer(buf, bodySize);
- }
-
- private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
- {
-
-
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
- tupleOutput.writeBoolean(publishBody.isMandatory());
- tupleOutput.writeBoolean(publishBody.isImmediate());
-
- }
-
- private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
- {
- // write out the content header body
- final int bodySize = headerBody.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- headerBody.writePayload(buf);
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(underlying);
- }
-}
Deleted: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java 2008-05-29 16:05:10 UTC (rev 2085)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueBindingKey.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -1,49 +0,0 @@
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-/**
- * Created by IntelliJ IDEA.
- * User: U146758
- * Date: 19-Feb-2007
- * Time: 14:11:01
- * To change this template use File | Settings | File Templates.
- */
-public class QueueBindingKey extends Object
-{
- private final AMQShortString _exchangeName;
- private final AMQShortString _queueName;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- public QueueBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments)
- {
- _exchangeName = exchangeName;
- _queueName = queueName;
- _routingKey = routingKey;
- _arguments = arguments;
- }
-
-
- public AMQShortString getExchangeName()
- {
- return _exchangeName;
- }
-
- public AMQShortString getQueueName()
- {
- return _queueName;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public FieldTable getArguments()
- {
- return _arguments;
- }
-
-}
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DeliveryDetailsKey.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,64 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class QueueEntryKey
+{
+ public AMQShortString queueName;
+ public long messageId;
+
+
+ public QueueEntryKey()
+ {
+ }
+
+
+ public QueueEntryKey(byte[] payload)
+ {
+ final TupleInput ti = new TupleInput(payload);
+
+ queueName = AMQShortStringEncoding.readShortString(ti);
+
+ messageId = ti.readLong();
+
+ }
+
+ public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
+ {
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ final QueueEntryKey mk = new QueueEntryKey();
+
+
+ mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ mk.messageId = tupleInput.readLong();
+
+ return mk;
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final QueueEntryKey mk = (QueueEntryKey) object;
+
+ AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
+ tupleOutput.writeLong(mk.messageId);
+
+ }
+
+
+ }
+
+ public QueueEntryKey(AMQShortString queueName, long messageId)
+ {
+ this.queueName = queueName;
+ this.messageId = messageId;
+ }
+
+
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
___________________________________________________________________
Name: svn:eol-style
+ native
Copied: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java (from rev 2039, store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQQueueTupleBinding.java)
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java 2008-05-29 16:22:36 UTC (rev 2086)
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
+
+public class QueueTB extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(QueueTB.class);
+
+
+
+ private final VirtualHost _virtualHost;
+
+ public QueueTB(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+
+
+ try
+ {
+ return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, null);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to create queue: " + e, e);
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ AMQQueue queue = (AMQQueue) object;
+
+
+ AMQShortStringEncoding.writeShortString(queue.getName(),tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(),tupleOutput);
+
+ }
+}
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueTB.java
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the rhmessaging-commits
mailing list