Author: rgemmell
Date: 2010-05-19 10:52:46 -0400 (Wed, 19 May 2010)
New Revision: 3976
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
Log:
Enable persistence of queue exclusivity
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-17
21:54:31 UTC (rev 3975)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -89,6 +89,7 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind
and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers.
</table>
*/
+@SuppressWarnings({"unchecked","deprecation"})
public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -557,9 +558,11 @@
queueRecord.getNameShortString().asString();
String owner = queueRecord.getOwner() == null ? null :
queueRecord.getOwner().asString();
+ boolean exclusive = queueRecord.isExclusive();
+
FieldTable arguments = queueRecord.getArguments();
- qrh.queue(queueName, owner, arguments);
+ qrh.queue(queueName, owner, exclusive, arguments);
}
}
@@ -1042,7 +1045,7 @@
if (_state != State.RECOVERING)
{
QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), arguments);
+ queue.getOwner(),
queue.isExclusive(), arguments);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-17
21:54:31 UTC (rev 3975)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -28,11 +28,13 @@
private final AMQShortString _queueName;
private final AMQShortString _owner;
private final FieldTable _arguments;
+ private boolean _exclusive;
- public QueueRecord(AMQShortString queueName, AMQShortString owner, FieldTable
arguments)
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive,
FieldTable arguments)
{
_queueName = queueName;
_owner = owner;
+ _exclusive = exclusive;
_arguments = arguments;
}
@@ -45,6 +47,11 @@
{
return _owner;
}
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
public FieldTable getArguments()
{
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-17
21:54:31 UTC (rev 3975)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -36,7 +36,7 @@
{
default:
case 3:
- //no change from v2
+ return new QueueTuple_3();
case 2:
return new QueueTuple_2();
case 1:
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-17
21:54:31 UTC (rev 3975)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -36,7 +36,7 @@
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- return new QueueRecord(name, owner, null);
+ return new QueueRecord(name, owner, false, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-17
21:54:31 UTC (rev 3975)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -48,7 +48,7 @@
// Addition for Version 2 of this table, read the queue arguments
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- return new QueueRecord(name, owner, arguments);
+ return new QueueRecord(name, owner, false, arguments);
}
catch (DatabaseException e)
{
Copied:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java
(from rev 3972,
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java)
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java
(rev 0)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java 2010-05-19
14:52:46 UTC (rev 3976)
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_3 extends QueueTuple_1
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_3.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_3()
+ {
+ super();
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ // Addition for Version 3 of this table, read the queue exclusivity
+ boolean exclusive = tupleInput.readBoolean();
+
+ return new QueueRecord(name, owner, exclusive, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ QueueRecord queue = (QueueRecord) object;
+
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(),
tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ // Addition for Version 3 of this table, store the queue exclusivity
+ tupleOutput.writeBoolean(queue.isExclusive());
+ }
+}