rhmessaging commits: r3978 - in mgmt/newdata/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-19 17:31:07 -0400 (Wed, 19 May 2010)
New Revision: 3978
Added:
mgmt/newdata/cumin/bin/cumin-smoke-test
Modified:
mgmt/newdata/cumin/python/cumin/model.py
mgmt/newdata/cumin/python/cumin/session.py
Log:
Add a call_method to CuminSession; add a test program
Added: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test (rev 0)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-05-19 21:31:07 UTC (rev 3978)
@@ -0,0 +1,64 @@
+#!/usr/bin/python
+
+import os
+import sys
+
+home = os.environ.get("CUMIN_HOME", os.path.normpath("/usr/share/cumin"))
+sys.path.append(os.path.join(home, "python"))
+
+from cumin import *
+from cumin.config import *
+from cumin.util import *
+
+def main():
+ config = CuminConfig()
+ values = config.parse()
+
+ parser = CuminOptionParser(values.web)
+
+ parser.add_option("--host", default=values.web.host)
+ parser.add_option("--port", default=values.web.port)
+
+ opts, args = parser.parse_args()
+
+ setup_logging(opts)
+
+ cumin = Cumin(config.home, opts.broker, opts.database,
+ opts.host, opts.port)
+
+ cumin.user = values.web.user
+
+ cumin.check()
+ cumin.init()
+
+ if opts.init_only:
+ return
+
+ cumin.start()
+
+ sleep(5)
+
+ conn = cumin.database.get_connection()
+ cursor = conn.cursor()
+
+ cls = cumin.model.org_apache_qpid_broker.Broker
+ broker = cls.get_object(cursor)
+
+ def completion(x, y):
+ print "XXX", x, y
+
+ cumin.session.call_method(completion, broker, "echo", (1, "yeah"))
+
+ try:
+ while True:
+ # print_threads()
+
+ sleep(5)
+ finally:
+ cumin.stop()
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ pass
Property changes on: mgmt/newdata/cumin/bin/cumin-smoke-test
___________________________________________________________________
Name: svn:executable
+ *
Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py 2010-05-19 18:35:46 UTC (rev 3977)
+++ mgmt/newdata/cumin/python/cumin/model.py 2010-05-19 21:31:07 UTC (rev 3978)
@@ -25,11 +25,6 @@
self.app = app
self.model_dir = model_dir
- self.lock = Lock()
-
- # int seq => callable
- self.outstanding_method_calls = dict()
-
self.tasks = list()
self.task_invocations = list()
Modified: mgmt/newdata/cumin/python/cumin/session.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/session.py 2010-05-19 18:35:46 UTC (rev 3977)
+++ mgmt/newdata/cumin/python/cumin/session.py 2010-05-19 21:31:07 UTC (rev 3978)
@@ -1,7 +1,7 @@
from model import *
from util import *
-from qmf.console import Console, Session
+from qmf.console import Console, Session, ClassKey, ObjectId
log = logging.getLogger("cumin.session")
@@ -13,6 +13,11 @@
self.qmf_session = None
self.qmf_brokers = list()
+ # int seq => callable
+ self.outstanding_method_calls = dict()
+
+ self.lock = Lock()
+
def add_broker(self, uri):
log.info("Adding QMF broker at %s", uri)
@@ -36,7 +41,7 @@
assert self.qmf_session is None
- self.qmf_session = Session(CuminConsole(self.app.model),
+ self.qmf_session = Session(CuminConsole(self),
manageConnections=True,
rcvObjects=False)
@@ -48,12 +53,34 @@
for qmf_broker in self.qmf_brokers:
self.qmf_session.delBroker(qmf_broker)
+ def call_method(self, callback, obj, name, args):
+ assert isinstance(obj, RosemaryObject)
+
+ oid_args = {"_agent_name": obj._qmf_agent_id,
+ "_object_name": obj._qmf_object_id}
+
+ broker = self.qmf_brokers[0]
+ ck = ClassKey(obj._qmf_class_key)
+ oid = ObjectId(oid_args)
+
+ self.lock.acquire()
+ try:
+ seq = self.qmf_session._sendMethodRequest \
+ (broker, ck, oid, name, args)
+
+ if seq is not None:
+ self.outstanding_method_calls[seq] = callback
+
+ return seq
+ finally:
+ self.lock.release()
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.broker_uri)
class CuminConsole(Console):
- def __init__(self, model):
- self.model = model
+ def __init__(self, session):
+ self.session = session
def newAgent(self, qmf_agent):
log.info("New agent %s", qmf_agent)
@@ -66,9 +93,9 @@
seq, broker)
log.debug("Response: %s", response)
- self.model.lock.acquire()
+ self.session.lock.acquire()
try:
- callback = self.model.outstanding_method_calls.pop(seq)
+ callback = self.session.outstanding_method_calls.pop(seq)
callback(response.text, response.outArgs)
finally:
- self.model.lock.release()
+ self.session.lock.release()
14 years, 7 months
rhmessaging commits: r3977 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-05-19 14:35:46 -0400 (Wed, 19 May 2010)
New Revision: 3977
Modified:
mgmt/newdata/cumin/python/cumin/objectframe.py
mgmt/newdata/cumin/python/cumin/objectframe.strings
Log:
Use two column of task links if there are more than 6 tasks
Modified: mgmt/newdata/cumin/python/cumin/objectframe.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectframe.py 2010-05-19 14:52:46 UTC (rev 3976)
+++ mgmt/newdata/cumin/python/cumin/objectframe.py 2010-05-19 18:35:46 UTC (rev 3977)
@@ -117,14 +117,33 @@
self.link = ObjectTasksLink(app, "link")
self.add_child(self.link)
- def render_links(self, session):
+ self.table_tmpl = WidgetTemplate(self, "table_html")
+
+ def do_render(self, session):
+ if len(self.frame.tasks) > 6:
+ writer = Writer()
+ self.table_tmpl.render(writer, session)
+ return writer.to_string()
+ else:
+ return super(ObjectTasks, self).do_render(session)
+
+ def render_links1(self, session):
+ return self.render_task_links(session, 0)
+
+ def render_links2(self, session):
+ return self.render_task_links(session, 6)
+
+ def render_task_links(self, session, start):
writer = Writer()
-
- for task in self.frame.tasks:
+ end = min(start + 6, len(self.frame.tasks))
+ for task in self.frame.tasks[start:end]:
writer.write(self.link.render(session, task))
return writer.to_string()
+ def render_links(self, session):
+ return self.render_task_links(session, 0)
+
class ObjectTasksLink(Link):
def render_href(self, session, task):
return task.get_href(session)
@@ -230,6 +249,10 @@
tasks = SummaryTasks(app, "tasks", self.object)
self.add_child(tasks)
+ def render_wide(self, session):
+ if len(self.frame.tasks) > 6:
+ return "wide"
+
class Attributes(SummaryAttributes):
def get_attributes(self, session):
return self.frame.summary_attributes
Modified: mgmt/newdata/cumin/python/cumin/objectframe.strings
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectframe.strings 2010-05-19 14:52:46 UTC (rev 3976)
+++ mgmt/newdata/cumin/python/cumin/objectframe.strings 2010-05-19 18:35:46 UTC (rev 3977)
@@ -55,11 +55,28 @@
[ObjectAttributesEntry.html]
<tr><th>{name}</th><td>{value}</td></tr>
+[ObjectTasks.css]
+table.ObjectTasks {
+ float:right;
+}
+table.ObjectTasks td {
+ vertical-align: top;
+}
+
[ObjectTasks.html]
<ul class="{class}">
{links}
</ul>
+[ObjectTasks.table_html]
+<table class="{class}">
+ <tbody>
+ <tr><td><ul class="{class}">{links1}</ul></td>
+ <td><ul class="{class}">{links2}</ul></td>
+ </tr>
+ </tbody>
+</table>
+
[ObjectTasksLink.html]
<li><a href="{href}">{content}</a></li>
@@ -126,6 +143,9 @@
width: 40em;
font-size: 0.9em;
}
+div.ObjectViewSummary.wide {
+ width: 55em;
+}
[ObjectView.html]
{context}
@@ -144,10 +164,10 @@
</div>
[ObjectViewSummary.html]
-<div class="{class}">
+<div class="{class} {wide}">
{tasks}
{attributes}
-</div>
+</div><div style="clear:both;"></div>
[ObjectViewContext.html]
<div class="{class}">{links}</div>
14 years, 7 months
rhmessaging commits: r3976 - in store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb: records and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-05-19 10:52:46 -0400 (Wed, 19 May 2010)
New Revision: 3976
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
Log:
Enable persistence of queue exclusivity
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-17 21:54:31 UTC (rev 3975)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -89,6 +89,7 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
+@SuppressWarnings({"unchecked","deprecation"})
public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -557,9 +558,11 @@
queueRecord.getNameShortString().asString();
String owner = queueRecord.getOwner() == null ? null :
queueRecord.getOwner().asString();
+ boolean exclusive = queueRecord.isExclusive();
+
FieldTable arguments = queueRecord.getArguments();
- qrh.queue(queueName, owner, arguments);
+ qrh.queue(queueName, owner, exclusive, arguments);
}
}
@@ -1042,7 +1045,7 @@
if (_state != State.RECOVERING)
{
QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
- queue.getOwner(), arguments);
+ queue.getOwner(), queue.isExclusive(), arguments);
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-17 21:54:31 UTC (rev 3975)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -28,11 +28,13 @@
private final AMQShortString _queueName;
private final AMQShortString _owner;
private final FieldTable _arguments;
+ private boolean _exclusive;
- public QueueRecord(AMQShortString queueName, AMQShortString owner, FieldTable arguments)
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, boolean exclusive, FieldTable arguments)
{
_queueName = queueName;
_owner = owner;
+ _exclusive = exclusive;
_arguments = arguments;
}
@@ -45,6 +47,11 @@
{
return _owner;
}
+
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
public FieldTable getArguments()
{
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-17 21:54:31 UTC (rev 3975)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -36,7 +36,7 @@
{
default:
case 3:
- //no change from v2
+ return new QueueTuple_3();
case 2:
return new QueueTuple_2();
case 1:
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-17 21:54:31 UTC (rev 3975)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -36,7 +36,7 @@
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- return new QueueRecord(name, owner, null);
+ return new QueueRecord(name, owner, false, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-17 21:54:31 UTC (rev 3975)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -48,7 +48,7 @@
// Addition for Version 2 of this table, read the queue arguments
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- return new QueueRecord(name, owner, arguments);
+ return new QueueRecord(name, owner, false, arguments);
}
catch (DatabaseException e)
{
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java (from rev 3972, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_3.java 2010-05-19 14:52:46 UTC (rev 3976)
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_3 extends QueueTuple_1
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_3.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_3()
+ {
+ super();
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ // Addition for Version 3 of this table, read the queue exclusivity
+ boolean exclusive = tupleInput.readBoolean();
+
+ return new QueueRecord(name, owner, exclusive, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ QueueRecord queue = (QueueRecord) object;
+
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ // Addition for Version 3 of this table, store the queue exclusivity
+ tupleOutput.writeBoolean(queue.isExclusive());
+ }
+}
14 years, 7 months
rhmessaging commits: r3975 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-17 17:54:31 -0400 (Mon, 17 May 2010)
New Revision: 3975
Modified:
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
Improve stat reporting; fix agent reference
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-05-17 18:25:44 UTC (rev 3974)
+++ mgmt/newdata/mint/python/mint/session.py 2010-05-17 21:54:31 UTC (rev 3975)
@@ -112,10 +112,8 @@
self.model.app.update_thread.enqueue(up)
def objectStats(self, broker, obj):
- print "objectStats!", broker, obj
+ agent = self.model.get_agent(obj._agent)
- agent = self.get_agent(obj._agent)
-
if self.model.app.update_thread.isAlive():
up = ObjectAddSample(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-05-17 18:25:44 UTC (rev 3974)
+++ mgmt/newdata/mint/python/mint/update.py 2010-05-17 21:54:31 UTC (rev 3975)
@@ -11,16 +11,15 @@
def __init__(self, app):
super(UpdateThread, self).__init__(app)
+ self.updates = ConcurrentQueue()
+
+ self.stats = UpdateStats(self.app)
self.conn = None
- self.stats = None
- self.updates = ConcurrentQueue()
-
self.halt_on_error = True
def init(self):
self.conn = self.app.database.get_connection()
- self.stats = UpdateStats()
def enqueue(self, update):
update.thread = self
@@ -54,12 +53,12 @@
class UpdateStats(object):
names = ("Enqueued", "Dequeued", "Updated", "Deleted", "Dropped")
headings = ("%8s " * 5) % names
- rates_fmt = "%8.1f " * 5
+ rates_fmt = ("%8.1f " * 5)
then = None
now = None
- def __init__(self):
+ def __init__(self, app):
self.enqueued = 0
self.dequeued = 0
@@ -89,17 +88,20 @@
if not self.then:
return
- values = (self.now.enqueued - self.then.enqueued,
+ values = [self.now.enqueued - self.then.enqueued,
self.now.dequeued - self.then.dequeued,
self.now.updated - self.then.updated,
self.now.deleted - self.then.deleted,
- self.now.dropped - self.then.dropped)
+ self.now.dropped - self.then.dropped]
+ # XXX
+ values[2] += self.now.samples_updated - self.then.samples_updated
+ values[4] += self.now.samples_dropped - self.then.samples_dropped
+
secs = self.now.time - self.then.time
+ rates = map(lambda x: x / secs, values)
- rates = tuple(map(lambda x: x / secs, values))
-
- print self.rates_fmt % rates
+ print self.rates_fmt % tuple(rates)
class Update(object):
def __init__(self, model):
@@ -374,7 +376,7 @@
cursor.close()
stats.samples_updated += 1
-
+
def process_samples(self, obj, update_columns, insert_columns):
for stat, value in self.object.getStatistics():
try:
14 years, 7 months
rhmessaging commits: r3974 - in mgmt/newdata: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-17 14:25:44 -0400 (Mon, 17 May 2010)
New Revision: 3974
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/mint/python/mint/tools.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/util.py
Log:
Bring back mint stat reporting
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-05-17 18:17:45 UTC (rev 3973)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-05-17 18:25:44 UTC (rev 3974)
@@ -15,6 +15,7 @@
values = config.parse()
parser = CuminOptionParser(values.data)
+ parser.add_option("--print-stats", action="store_true")
opts, args = parser.parse_args()
@@ -33,10 +34,24 @@
mint.start()
try:
- while True:
- # print_threads()
+ stats = mint.update_thread.stats
+ count = 0
- sleep(5)
+ if opts.print_stats:
+ print "[Reported values are the number of events per second]"
+
+ while True:
+ if count % 24 == 0:
+ stats.print_headings()
+
+ count += 1
+
+ stats.print_rates()
+
+ sleep(5)
+ else:
+ while True:
+ sleep(86400)
finally:
mint.stop()
Modified: mgmt/newdata/mint/python/mint/tools.py
===================================================================
--- mgmt/newdata/mint/python/mint/tools.py 2010-05-17 18:17:45 UTC (rev 3973)
+++ mgmt/newdata/mint/python/mint/tools.py 2010-05-17 18:25:44 UTC (rev 3974)
@@ -325,185 +325,3 @@
subject.syncUpdate()
print "Password of user '%s' is changed" % subject.name
-
-class MintServerTool(BaseMintTool):
- def __init__(self, name):
- super(MintServerTool, self).__init__(name)
-
- def init(self):
- super(MintServerTool, self).init()
-
- # get better thread switching performance
- sys.setcheckinterval(200)
-
- def do_run(self, opts, args):
- app = Mint(self.config)
-
- app.check()
- app.init()
- app.start()
-
- try:
- for arg in args[1:]:
- app.model.add_broker(arg)
-
- while True:
- sleep(2)
- finally:
- app.stop()
-
-class MintTestTool(BaseMintTool):
- def __init__(self, name):
- super(MintTestTool, self).__init__(name)
-
- def do_run(self, opts, args):
- app = Mint(self.config)
-
- app.check()
- app.init()
- app.start()
-
- try:
- for arg in args[1:]:
- app.model.add_broker(arg)
-
- sleep(2)
-
- cls = app.model.org_apache_qpid_broker.Broker
-
- conn = app.database.get_connection()
- cursor = conn.cursor()
-
- for obj in cls.get_selection(cursor):
- try:
- agent = app.model.agents[obj._qmf_agent_id]
- except KeyError:
- continue
-
- break
-
- print "TTT", obj.port, obj, agent
-
- def completion(status_code, output_args):
- print "YYY", status_code, output_args
-
- agent.call_method(completion, obj, "echo", 1, "ggoo!")
-
- while True:
- sleep(2)
- finally:
- app.stop()
-
-class MintBenchTool(BaseMintTool):
- def __init__(self, name):
- super(MintBenchTool, self).__init__(name)
-
- def init(self):
- super(MintBenchTool, self).init()
-
- # get better thread switching performance
- sys.setcheckinterval(200)
-
- def do_run(self, opts, args):
- app = Mint(self.config)
-
- app.check()
- app.init()
- app.start()
-
- head = "%8s %8s %8s %8s %8s %8s %8s %8s %8s" % \
- ("enqs", "deqs", "depth", "update", "delete", "drop",
- "s.update", "s.expire", "s.drop")
- row = "%8i %8i %8i %8i %8i %8i %8i %8i %8i"
-
- try:
- for arg in args[1:]:
- try:
- app.model.add_broker(arg)
- except socket.error, e:
- print "Warning: Failed connecting to broker at '%s'" % arg
-
- try:
- enq = 0
- deq = 0
-
- upd = 0
- dlt = 0
- drp = 0
-
- supd = 0
- sexp = 0
- sdrp = 0
-
- enq_last = 0
- deq_last = 0
-
- upd_last = 0
- dlt_last = 0
- drp_last = 0
-
- supd_last = 0
- sexp_last = 0
- sdrp_last = 0
-
- samples = 0
-
- while True:
- if samples % 24 == 0:
- print head
-
- samples += 1
-
- sleep(1)
-
- stats = app.update_thread.stats
-
- enq = stats.enqueued
- deq = stats.dequeued
-
- upd = stats.updated
- dlt = stats.deleted
- drp = stats.dropped
-
- supd = stats.samples_updated
- sexp = stats.samples_expired
- sdrp = stats.samples_dropped
-
- print row % (enq - enq_last,
- deq - deq_last,
- enq - deq,
- upd - upd_last,
- dlt - dlt_last,
- drp - drp_last,
- supd - supd_last,
- sexp - sexp_last,
- sdrp - sdrp_last)
-
- enq_last = enq
- deq_last = deq
-
- upd_last = upd
- dlt_last = dlt
- drp_last = drp
-
- supd_last = supd
- sexp_last = sexp
- sdrp_last = sdrp
- finally:
- print "Totals:"
-
- print row % (enq,
- deq,
- enq - deq,
- upd,
- dlt,
- drp,
- supd,
- sexp,
- sdrp)
- finally:
- #from threading import enumerate
- #for item in enumerate():
- # print item
-
- app.stop()
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-05-17 18:17:45 UTC (rev 3973)
+++ mgmt/newdata/mint/python/mint/update.py 2010-05-17 18:25:44 UTC (rev 3974)
@@ -1,3 +1,4 @@
+import copy
import pickle
from psycopg2 import IntegrityError, TimestampFromTicks
@@ -51,6 +52,13 @@
update.process(self.conn, self.stats)
class UpdateStats(object):
+ names = ("Enqueued", "Dequeued", "Updated", "Deleted", "Dropped")
+ headings = ("%8s " * 5) % names
+ rates_fmt = "%8.1f " * 5
+
+ then = None
+ now = None
+
def __init__(self):
self.enqueued = 0
self.dequeued = 0
@@ -63,6 +71,36 @@
self.samples_expired = 0
self.samples_dropped = 0
+ self.time = None
+
+ def capture(self):
+ now = copy.copy(self)
+ now.time = time.time()
+
+ UpdateStats.then = UpdateStats.now
+ UpdateStats.now = now
+
+ def print_headings(self):
+ print self.headings
+
+ def print_rates(self):
+ self.capture()
+
+ if not self.then:
+ return
+
+ values = (self.now.enqueued - self.then.enqueued,
+ self.now.dequeued - self.then.dequeued,
+ self.now.updated - self.then.updated,
+ self.now.deleted - self.then.deleted,
+ self.now.dropped - self.then.dropped)
+
+ secs = self.now.time - self.then.time
+
+ rates = tuple(map(lambda x: x / secs, values))
+
+ print self.rates_fmt % rates
+
class Update(object):
def __init__(self, model):
self.model = model
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-05-17 18:17:45 UTC (rev 3973)
+++ mgmt/newdata/mint/python/mint/util.py 2010-05-17 18:25:44 UTC (rev 3974)
@@ -2,6 +2,7 @@
import os
import random
import sys
+import time
from Queue import Queue as ConcurrentQueue, Full, Empty
from crypt import crypt
14 years, 7 months
rhmessaging commits: r3973 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-17 14:17:45 -0400 (Mon, 17 May 2010)
New Revision: 3973
Modified:
mgmt/newdata/cumin/python/cumin/config.py
Log:
Prioritize lookup from most localized to least
Modified: mgmt/newdata/cumin/python/cumin/config.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/config.py 2010-05-17 11:15:26 UTC (rev 3972)
+++ mgmt/newdata/cumin/python/cumin/config.py 2010-05-17 18:17:45 UTC (rev 3973)
@@ -40,9 +40,9 @@
def parse(self):
paths = list()
+ paths.append(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
+ paths.append(os.path.join(self.home, "etc", "cumin.conf"))
paths.append(os.path.join("", "etc", "cumin.conf"))
- paths.append(os.path.join(self.home, "etc", "cumin.conf"))
- paths.append(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
return self.parse_files(paths)
14 years, 7 months
rhmessaging commits: r3972 - in store/trunk/java/bdbstore/src: tools/java/org/apache/qpid/server/util and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-05-17 07:15:26 -0400 (Mon, 17 May 2010)
New Revision: 3972
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
Log:
exclude broken test code to allow compilatation of the store itself
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-05-14 16:23:59 UTC (rev 3971)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-05-17 11:15:26 UTC (rev 3972)
@@ -16,7 +16,7 @@
*
*/
package org.apache.qpid.server.store.berkeleydb;
-
+/*
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -36,7 +36,6 @@
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -50,9 +49,13 @@
import java.io.File;
import java.util.LinkedList;
import java.util.List;
+*/
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+
public class BDBStoreTest extends BDBVMTestCase
{
+ /*
private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
private BDBMessageStore _store;
@@ -533,4 +536,5 @@
{
return new TestSuite(BDBStoreTest.class);
}
+ */
}
Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2010-05-14 16:23:59 UTC (rev 3971)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/util/NullApplicationRegistry.java 2010-05-17 11:15:26 UTC (rev 3972)
@@ -35,6 +35,7 @@
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.Arrays;
@@ -74,7 +75,7 @@
_virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- VirtualHost dummyHost = new VirtualHost(hostConfig);
+ VirtualHost dummyHost = new VirtualHostImpl(hostConfig,null);
_virtualHostRegistry.registerVirtualHost(dummyHost);
_virtualHostRegistry.setDefaultVirtualHostName("test");
_pluginManager = new PluginManager("");
14 years, 7 months
rhmessaging commits: r3971 - in store/trunk/java/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/keys and 4 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-05-14 12:23:59 -0400 (Fri, 14 May 2010)
New Revision: 3971
Added:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
Removed:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
Initial update to implement the new RecoveryHandler store behaviour, plus other updates required since 0-10 support was added to the trunk broker.
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,53 +20,66 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-import com.sleepycat.je.*;
-
import org.apache.commons.configuration.Configuration;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.AbstractMessageStore;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_3;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -76,62 +89,40 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-public class BDBMessageStore extends AbstractMessageStore
+public class BDBMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
- private static final int DATABASE_FORMAT_VERSION = 2;
+ private static final int DATABASE_FORMAT_VERSION = 3;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
-
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
-
- /**
- * Maps from messageId to an AMQMessage (note we don't use serialisation but this is what it roughly corresponds
- * to)
- */
+ private String MESSAGECONTENTDB_NAME = "messageContentDb";
+ private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
+ private String DELIVERYDB_NAME = "deliveryDb";
+ private String EXCHANGEDB_NAME = "exchangeDb";
+ private String QUEUEDB_NAME = "queueDb";
private Database _messageMetaDataDb;
-
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
-
private Database _messageContentDb;
-
- private String QUEUEDB_NAME = "queueDb";
-
- /** Maps from name (which uniquely identifies a queue) to an AMQQueue */
- private Database _queueDb;
-
- private String DELIVERYDB_NAME = "deliveryDb";
-
- /** Maps from a queue name to a message id. This is what stores the pending deliveries for a given queue */
+ private Database _queueBindingsDb;
private Database _deliveryDb;
-
- private String EXCHANGEDB_NAME = "exchangeDb";
private Database _exchangeDb;
+ private Database _queueDb;
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private Database _queueBindingsDb;
+ private LogSubject _logSubject;
- private VirtualHost _virtualHost;
+ private final AtomicLong _messageId = new AtomicLong(0);
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private final AtomicLong _queueId = new AtomicLong(1);
-
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
- private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
-
- // Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
-
+ // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
+ private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
- Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
-
/** The data version this store should run with */
private int _version;
private enum State
@@ -149,6 +140,9 @@
private TransactionConfig _transactionConfig = new TransactionConfig();
+ private boolean _configured;
+
+
public BDBMessageStore()
{
this(DATABASE_FORMAT_VERSION);
@@ -176,23 +170,78 @@
QUEUEBINDINGSDB_NAME += "_v" + version;
}
}
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ _logSubject = logSubject;
+ CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+ if(_configured)
+ {
+ throw new Exception("ConfigStore already configured");
+ }
+
+ configure(name,storeConfiguration);
+
+ _configured = true;
+ stateTransition(State.CONFIGURING, State.CONFIGURED);
+
+ recover(recoveryHandler);
+ stateTransition(State.RECOVERING, State.STARTED);
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverMessages(recoveryHandler);
+ }
+
+ public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject) throws Exception
+ {
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName()));
+
+ if(!_configured)
+ {
+ throw new Exception("ConfigStore not configured");
+ }
+
+ recoverQueueEntries(recoveryHandler);
+
+
+ }
+
+ public org.apache.qpid.server.store.TransactionLog.Transaction newTransaction()
+ {
+ return new BDBTransaction();
+ }
+
+
/**
- * Called after instantiation in order to configure the message store. A particular implementation can define
- * whatever parameters it wants.
+ * Called after instantiation in order to configure the message store.
*
- * @param virtualHost The virtual host using by this store
- * @param base Not used
+ * @param name The name of the virtual host using this store
* @param vHostConfig The configuration for this virtualhost
+ * @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
+ public boolean configure(String name, Configuration storeConfig) throws Exception
{
- super.configure(virtualHost, base, vHostConfig);
-
- Configuration config = vHostConfig.getStoreConfiguration();
- File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + "/bdbstore/" + virtualHost.getName()));
+ File environmentPath = new File(storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK") + "/bdbstore/" + name));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -204,53 +253,39 @@
CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
- _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+ _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
- configure(virtualHost, environmentPath, false);
+ return configure(environmentPath, false);
}
- public void configure(File environmentPath) throws AMQException, DatabaseException
+
+ /**
+ *
+ * @param environmentPath location for the store to be created in/recovered from
+ * @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
+ * @return whether or not a new store environment was created
+ * @throws AMQException
+ * @throws DatabaseException
+ */
+ protected boolean configure(File environmentPath, boolean readonly) throws AMQException, DatabaseException
{
- configure(null, environmentPath, false);
- }
-
- public void configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
- {
stateTransition(State.INITIAL, State.CONFIGURING);
_log.info("Configuring BDB message store");
+ createTupleBindingFactories(_version);
+
setDatabaseNames(_version);
- if (virtualHost != null)
- {
- setVirtualHost(virtualHost);
- }
-
- boolean newEnvironment = setupStore(environmentPath, readonly);
-
- // Performing recovery when we only want read access will cause all the broker objects to be recreated
- // This will/may include thread pool creations that may be duplicated when manually inspecting the state of
- // the store. Simplest solution is to prevent the initial creation of the state by blocking recovery.
- if (!readonly)
- {
- stateTransition(State.CONFIGURING, State.CONFIGURED);
-
- //If we have loaded an environment and have virtualHost configured then recover environment
- if (!newEnvironment && virtualHost != null)
- {
- // this recovers durable queues and persistent messages
- recover(virtualHost);
- }
- }
-
- //if we have a new environment there we can jump to started as there is no recovery requried..
- if (newEnvironment) // && !readonly is implied as you cant get a newEnviroment in readonly mode.
- {
- stateTransition(State.CONFIGURED, State.STARTED);
- }
-
+ return setupStore(environmentPath, readonly);
}
+
+ private void createTupleBindingFactories(int version)
+ {
+ _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
+ _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
+ _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
+ }
/**
* Move the store state from CONFIGURING to STARTED.
@@ -288,7 +323,8 @@
{
int versionIndex = s.indexOf("_v");
- // DB is v1 if _version is not v1 then error
+ // lack of _v index suggests DB is v1
+ // so if _version is not v1 then error
if (versionIndex == -1)
{
if (_version != 1)
@@ -315,12 +351,6 @@
}
}
- private void createTupleBindingFactories(int version)
- {
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version, _virtualHost);
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version, _virtualHost);
- }
-
private synchronized void stateTransition(State requiredState, State newState) throws AMQException
{
if (_state != requiredState)
@@ -344,16 +374,15 @@
{
_log.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allos the creation of the store if it does not already exist.
+ // This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(false);
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
- // Restore 500,000 default timeout.
+ // Restore 500,000 default timeout.
//envConfig.setLockTimeout(15000);
// Added to help diagnosis of Deadlock issue
- //
// http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
if (Boolean.getBoolean("qpid.bdb.lock.debug"))
{
@@ -467,8 +496,8 @@
closeEnvironment();
_state = State.CLOSED;
-
- super.close();
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_CLOSED());
}
private void closeEnvironment() throws DatabaseException
@@ -483,43 +512,275 @@
}
}
+
+ public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ {
+ stateTransition(State.CONFIGURED, State.RECOVERING);
+
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START());
+
+ try
+ {
+ QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+ loadQueues(qrh);
+
+ ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ loadExchanges(erh);
+
+ BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ recoverBindings(brh);
+
+ brh.completeBindingRecovery();
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQException("Error recovering persistent state: " + e, e);
+ }
+
+ }
+
+ private void loadQueues(QueueRecoveryHandler qrh) throws DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _queueDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _queueTupleBindingFactory.getInstance();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+
+ String queueName = queueRecord.getNameShortString() == null ? null :
+ queueRecord.getNameShortString().asString();
+ String owner = queueRecord.getOwner() == null ? null :
+ queueRecord.getOwner().asString();
+ FieldTable arguments = queueRecord.getArguments();
+
+ qrh.queue(queueName, owner, arguments);
+ }
+
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+
+ private void loadExchanges(ExchangeRecoveryHandler erh) throws AMQException, DatabaseException
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _exchangeDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = new ExchangeTB();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
+
+ String exchangeName = exchangeRec.getNameShortString() == null ? null :
+ exchangeRec.getNameShortString().asString();
+ String type = exchangeRec.getType() == null ? null :
+ exchangeRec.getType().asString();
+ boolean autoDelete = exchangeRec.isAutoDelete();
+
+ erh.exchange(exchangeName, type, autoDelete);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverBindings(BindingRecoveryHandler brh) throws DatabaseException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _queueBindingsDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ TupleBinding binding = _bindingTupleBindingFactory.getInstance();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ //yes, this is retrieving all the useful information from the key only.
+ //For table compatibility it shall currently be left as is
+ BindingKey bindingRecord = (BindingKey) binding.entryToObject(key);
+
+ String exchangeName = bindingRecord.getExchangeName() == null ? null :
+ bindingRecord.getExchangeName().asString();
+ String queueName = bindingRecord.getQueueName() == null ? null :
+ bindingRecord.getQueueName().asString();
+ String routingKey = bindingRecord.getRoutingKey() == null ? null :
+ bindingRecord.getRoutingKey().asString();
+ ByteBuffer argumentsBB = (bindingRecord.getArguments() == null ? null :
+ java.nio.ByteBuffer.wrap(bindingRecord.getArguments().getDataAsBytes()));
+
+ brh.binding(exchangeName, queueName, routingKey, argumentsBB);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
+ {
+ StoredMessageRecoveryHandler mrh = msrh.begin();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _messageMetaDataDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);;
+
+ DatabaseEntry value = new DatabaseEntry();
+ EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+
+ long maxId = 0;
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = (Long) keyBinding.entryToObject(key);
+ StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
+
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+ mrh.message(message);
+
+ maxId = Math.max(maxId, messageId);
+ }
+
+ _messageId.set(maxId);
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e, e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ }
+
+ private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
+ throws DatabaseException, AMQException
+ {
+ QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = _deliveryDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding keyBinding = new QueueEntryTB();
+
+ DatabaseEntry value = new DatabaseEntry();
+ EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+
+ QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
+
+ AMQShortString queueName = dd.getQueueName();
+ long messageId = dd.getMessageId();
+
+ qerh.queueEntry(queueName.asString(),messageId);
+
+// if (_log.isDebugEnabled())
+// {
+// _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
+// }
+
+ }
+ }
+ catch (DatabaseException e)
+ {
+ _log.error("Database Error: " + e, e);
+ throw e;
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+// if (_log.isInfoEnabled())
+// {
+// _log.info("Recovered message counts: " + _queueRecoveries);
+// }
+//
+// for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
+// {
+// CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
+//
+// CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
+// }
+ }
+
/**
- * Removes the specified message from the store in the given transactional store context.
+ * Removes the specified message from the store.
*
- * @param context The transactional context to remove the message in.
* @param messageId Identifies the message to remove.
*
* @throws AMQException If the operation fails for any reason.
+ * @throws DatabaseException
*/
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
// + "): called");
- boolean localTx = getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = null;
+
+ Cursor cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
+
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ EntryBinding metaKeyBindingTuple = TupleBinding.getPrimitiveBinding(Long.class);
+ metaKeyBindingTuple.objectToEntry(messageId, key);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
- keyBinding.objectToEntry(messageId, key);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message Id: " + messageId + " Removing");
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Message Id: " + messageId + " Removing");
- }
-
- // first we need to look up the header to get the chunk count
- MessageMetaData mmd = getMessageMetaData(context, messageId);
- try
- {
+
OperationStatus status = _messageMetaDataDb.delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- if (localTx)
- {
- tx.abort();
- context.setPayload(null);
- }
+ tx.abort();
throw new AMQException("Message metadata not found for message id " + messageId);
}
@@ -529,56 +790,73 @@
_log.debug("Deleted metadata for message " + messageId);
}
- DatabaseEntry contentKey = new DatabaseEntry();
- TupleBinding contentKeyBinding = new MessageContentKey.TupleBinding();
- for (int i = 0; i < mmd.getContentChunkCount(); i++)
+ //now remove the content data from the store if there is any.
+
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_3 mck = new MessageContentKey_3(messageId,0);
+
+ TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+
+ //Use a partial record for the value to prevent retrieving the
+ //data itself as we only need the key to identify what to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
+
+ cursor = _messageContentDb.openCursor(tx, null);
+
+ status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
{
- MessageContentKey mck = new MessageContentKey(messageId, i);
- contentKeyBinding.objectToEntry(mck, contentKey);
- status = _messageContentDb.get(tx, contentKey, new DatabaseEntry(), LockMode.RMW);
- if (status == OperationStatus.NOTFOUND)
+ mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+
+ if(mck.getMessageId() != messageId)
{
- if (localTx)
- {
- tx.abort();
- context.setPayload(null);
- }
-
- throw new AMQException("Content chunk " + i + " not found for message " + messageId);
+ //we have exhausted all chunks for this message id, break
+ break;
}
-
- status = _messageContentDb.delete(tx, contentKey);
- if (status == OperationStatus.NOTFOUND)
+ else
{
- if (localTx)
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
{
+ cursor.close();
+ cursor = null;
+
tx.abort();
- context.setPayload(null);
+ throw new AMQException("Content chunk offset" + mck.getOffset() + " not found for message " + messageId);
}
-
- throw new AMQException("Content chunk " + i + " not found for message " + messageId);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
+ }
}
-
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content chunk " + i + " for message " + messageId);
- }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
- if (localTx)
- {
- commit(tx);
- context.setPayload(null);
- }
+ cursor.close();
+ cursor = null;
+
+ commit(tx);
}
catch (DatabaseException e)
{
- if ((tx != null) && localTx)
+ e.printStackTrace();
+
+ if (tx != null)
{
try
{
+ if(cursor != null)
+ {
+ cursor.close();
+ cursor = null;
+ }
+
tx.abort();
- context.setPayload(null);
}
catch (DatabaseException e1)
{
@@ -586,8 +864,23 @@
}
}
- throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+ throw new AMQException("Error removing message with id " + messageId + " from database: " + e, e);
}
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
/**
@@ -601,12 +894,17 @@
{
if (_state != State.RECOVERING)
{
+ ExchangeRecord exchangeRec = new ExchangeRecord(exchange.getNameShortString(),
+ exchange.getTypeShortString(), exchange.isAutoDelete());
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
+
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB(_virtualHost);
- exchangeBinding.objectToEntry(exchange, value);
+ TupleBinding exchangeBinding = new ExchangeTB();
+ exchangeBinding.objectToEntry(exchangeRec, value);
+
try
{
_exchangeDb.put(null, key, value);
@@ -630,7 +928,7 @@
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(exchange.getName(), key);
+ keyBinding.objectToEntry(exchange.getNameShortString(), key);
try
{
OperationStatus status = _exchangeDb.delete(null, key);
@@ -645,117 +943,9 @@
}
}
- private void recoverExchanges() throws AMQException, DatabaseException
- {
- for (Exchange exchange : loadExchanges())
- {
- recoverExchange(exchange);
- }
- }
- private void recoverExchange(Exchange exchange) throws AMQException, DatabaseException
- {
- _log.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- for (BindingKey binding : loadQueueBindings(exchange))
- {
- AMQQueue queue = queueRegistry.getQueue(binding.getQueueName());
- if (queue == null)
- {
- _log.error("Unknown queue: " + binding.getQueueName() + " cannot be bound to exchange: "
- + exchange.getName());
- }
- else
- {
- _log.info("Restoring binding: (Exchange: " + binding.getExchangeName() + ", Queue: " + binding
- .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
- + ")");
-
- queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
- }
- }
- }
-
- private List<BindingKey> loadQueueBindings(Exchange exchange) throws DatabaseException
- {
-
- Cursor cursor = null;
- List<BindingKey> queueBindings = new ArrayList<BindingKey>();
- try
- {
- cursor = _queueBindingsDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- BindingKey queueBinding =
- new BindingKey(exchange.getName(), null, null, null);
-
- EntryBinding<BindingKey> keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(queueBinding, key);
-
- OperationStatus opStatus = cursor.getSearchKeyRange(key, value, LockMode.RMW);
-
- TupleBinding binding = _bindingTupleBindingFactory.getInstance();
- while (opStatus == OperationStatus.SUCCESS)
- {
- queueBinding = (BindingKey) binding.entryToObject(key);
- if (queueBinding.getExchangeName().equals(exchange.getName()))
- {
- queueBindings.add(queueBinding);
- opStatus = cursor.getNext(key, value, LockMode.RMW);
- }
- else
- {
- break;
- }
- }
-
- return queueBindings;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
- private List<Exchange> loadExchanges() throws AMQException, DatabaseException
- {
- Cursor cursor = null;
- List<Exchange> exchanges = new ArrayList<Exchange>();
- try
- {
- cursor = _exchangeDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB(_virtualHost);
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Exchange exchange = (Exchange) binding.entryToObject(value);
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
- exchanges.add(exchange);
- _log.info("Registering exchange " + exchange.getName());
- }
-
- return exchanges;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- }
-
/**
* Binds the specified queue to an exchange with a routing key.
*
@@ -773,13 +963,20 @@
if (_state != State.RECOVERING)
{
+ BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
+ queue.getNameShortString(), routingKey, args);
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+
+ keyBinding.objectToEntry(bindingRecord, key);
+ //yes, this is writing out 0 as a value and putting all the
+ //useful info into the key, don't ask me why. For table
+ //compatibility it shall currently be left as is
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
-
+
try
{
_queueBindingsDb.put(null, key, value);
@@ -807,7 +1004,7 @@
{
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
- keyBinding.objectToEntry(new BindingKey(exchange.getName(), queue.getName(), routingKey, args), key);
+ keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
try
{
@@ -844,20 +1041,17 @@
if (_state != State.RECOVERING)
{
- long queueId = _queueId.getAndIncrement();
- _queueNameToIdMap.put(queue.getName(), queueId);
-
+ QueueRecord queueRecord= new QueueRecord(queue.getNameShortString(),
+ queue.getOwner(), arguments);
+
DatabaseEntry key = new DatabaseEntry();
-
EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(queue.getName(), key);
+ keyBinding.objectToEntry(queue.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
- ((QueueTuple) queueBinding).setArguments(arguments);
-
- queueBinding.objectToEntry(queue, value);
+ queueBinding.objectToEntry(queueRecord, value);
try
{
_queueDb.put(null, key, value);
@@ -878,12 +1072,10 @@
*/
public void removeQueue(final AMQQueue queue) throws AMQException
{
- AMQShortString name = queue.getName();
+ AMQShortString name = queue.getNameShortString();
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- Long queueId = _queueNameToIdMap.remove(name);
-
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTB();
keyBinding.objectToEntry(name, key);
@@ -902,34 +1094,6 @@
}
/**
- * Really for testing purposes.
- *
- * @param name
- *
- * @return
- *
- * @throws AMQException
- */
- AMQQueue getQueue(AMQShortString name) throws AMQException
- {
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
- keyBinding.objectToEntry(name, key);
- DatabaseEntry value = new DatabaseEntry();
- try
- {
- _queueDb.get(null, key, value, LockMode.RMW);
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
-
- return (AMQQueue) binding.entryToObject(value);
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error getting queue with name " + name + " from database: " + e, e);
- }
- }
-
- /**
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
@@ -938,15 +1102,16 @@
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
{
// _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
// + ", Long messageId): called");
- AMQShortString name = queue.getName();
- Transaction tx = (Transaction) context.getPayload();
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -968,28 +1133,6 @@
}
}
- private boolean getOrCreateTransaction(StoreContext context) throws AMQException
- {
-
- Transaction tx = (Transaction) context.getPayload();
- if (tx == null)
- {
- try
- {
- tx = _environment.beginTransaction(null, null);
- context.setPayload(tx);
-
- return true;
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error beginning transaction: " + e, e);
- }
- }
-
- return false;
- }
-
/**
* Extracts a message from a specified queue, in a given transactional context.
*
@@ -999,14 +1142,14 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final TransactionLogResource queue, Long messageId) throws AMQException
{
- AMQShortString name = queue.getName();
- boolean isLocal = getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+ AMQShortString name = new AMQShortString(queue.getResourceName());
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
@@ -1034,82 +1177,18 @@
_log.debug("Removed message " + messageId + ", " + name + " from delivery db");
}
-
- if (isLocal)
- {
- commit(tx);
- context.setPayload(null);
- }
}
catch (DatabaseException e)
{
_log.error("Failed to dequeue message " + messageId + ": " + e, e);
_log.error(tx);
- if (isLocal)
- {
- try
- {
- tx.abort();
- context.setPayload(null);
- }
- catch (DatabaseException e1)
- {
- throw new AMQException("Error rolling back transaction: " + e1, e1);
- }
- }
throw new AMQException("Error accessing database while dequeuing message: " + e, e);
}
}
- private boolean isLocalTransaction(StoreContext context)
- {
- return context.getPayload() == null;
- }
-
/**
- * Begins a transactional context.
- *
- * @param context The transactional context to begin.
- *
- * @throws AMQException If the operation fails for any reason.
- */
- public void beginTran(StoreContext context) throws AMQException
- {
- // _log.debug("public void beginTran(StoreContext context = " + context + "): called");
-
- if (context.getPayload() != null)
- {
- throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
- + context.getPayload());
- }
- else
- {
- try
- {
- context.setPayload(_environment.beginTransaction(null, _transactionConfig ));
- }
- catch (DatabaseException e)
- {
- throw new AMQException("Error starting transaction: " + e, e);
- }
- }
- }
-
- /**
- * Tests a transactional context to see if it has been begun but not yet committed or aborted.
- *
- * @param context The transactional context to test.
- *
- * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
- */
- public boolean inTran(StoreContext context)
- {
- return context.getPayload() != null;
- }
-
- /**
* Commits all operations performed within a given transactional context.
*
* @param context The transactional context to commit all operations for.
@@ -1120,7 +1199,7 @@
{
// _log.debug("public void commitTran(StoreContext context = " + context + "): called");
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
if (tx == null)
{
@@ -1146,6 +1225,13 @@
}
}
+ public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+ {
+ //TODO - Actually create an async commit implementation.
+ commitTran(context);
+ return IMMEDIATE_FUTURE;
+ }
+
/**
* Abandons all operations performed within a given transactional context.
*
@@ -1155,7 +1241,7 @@
*/
public void abortTran(StoreContext context) throws AMQException
{
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
if (_log.isDebugEnabled())
{
@@ -1194,7 +1280,7 @@
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
+ EntryBinding keyBinding = new QueueEntryTB();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1204,10 +1290,10 @@
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
dd = (QueueEntryKey) keyBinding.entryToObject(key);
- while ((status == OperationStatus.SUCCESS) && dd.queueName.equals(queueName))
+ while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
{
- messageIds.add(dd.messageId);
+ messageIds.add(dd.getMessageId());
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
@@ -1237,59 +1323,6 @@
}
}
- public void recover(VirtualHost virtualHost) throws AMQException
- {
- stateTransition(State.CONFIGURED, State.RECOVERING);
-
- _log.info("Recovering persistent state...");
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(null, false));
-
- StoreContext context = new StoreContext();
- try
- {
- beginTran(context);
- Map<AMQShortString, AMQQueue> queues = loadQueues();
-
- for (AMQQueue q : queues.values())
- {
- CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_RECOVERY_START(String.valueOf(q.getName()), true));
- //Record that we have a queue for recovery
- _queueRecoveries.put(q.getName(), 0);
-
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(q.getName().asString()));
-
- }
-
- recoverExchanges();
-
- deliverMessages(context, queues);
- _log.info("Persistent state recovered successfully");
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(null, false));
-
- commitTran(context);
-
- }
- catch (DatabaseException e)
- {
- abortTran(context);
-
- throw new AMQException("Error recovering persistent state: " + e, e);
- }
- catch (AMQException amqe)
- {
- abortTran(context);
- throw new AMQException("Error recovering persistent state: " + amqe, amqe);
- }
- catch (Throwable ioobe)
- {
- abortTran(context);
- throw new AMQException("Invalid database format. Please use upgrade tool for store in Virtualhost:'"
- + _virtualHost.getName() + "'", ioobe);
- }
-
- stateTransition(State.RECOVERING, State.STARTED);
- }
-
/**
* Return a valid, currently unused message id.
*
@@ -1297,7 +1330,7 @@
*/
public Long getNewMessageId()
{
- return _messageId.getAndIncrement();
+ return _messageId.incrementAndGet();
}
/**
@@ -1305,20 +1338,21 @@
*
* @param context The transactional context for the operation.
* @param messageId The message to store the data for.
- * @param index The index of the data chunk.
+ * @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
* @param lastContentBody Flag to indicate that this is the last such chunk for the message.
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
+ protected void addContent(StoreContext context, Long messageId, int offset,
+ ByteBuffer contentBody) throws AMQException
{
- Transaction tx = (Transaction) context.getPayload();
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
- TupleBinding keyBinding = new MessageContentKey.TupleBinding();
- keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+ TupleBinding keyBinding = new MessageContentKeyTB_3();
+ keyBinding.objectToEntry(new MessageContentKey_3(messageId, offset), key);
DatabaseEntry value = new DatabaseEntry();
TupleBinding messageBinding = new ContentTB();
messageBinding.objectToEntry(contentBody, value);
@@ -1327,13 +1361,13 @@
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQException("Error adding content chunk " + index + " for message id " + messageId + ": "
+ throw new AMQException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
+ status);
}
if (_log.isDebugEnabled())
{
- _log.debug("Storing content chunk " + index + " for message " + messageId + "[Transaction" + tx + "]");
+ _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
@@ -1351,7 +1385,7 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
+ private void storeMetaData(StoreContext context, Long messageId, StorableMessageMetaData messageMetaData)
throws AMQException
{
if (_log.isDebugEnabled())
@@ -1359,14 +1393,15 @@
_log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
- //This call breaking tests - not sure where the txn it creates should be committed ??
- //getOrCreateTransaction(context);
- Transaction tx = (Transaction) context.getPayload();
+
+ com.sleepycat.je.Transaction tx = (com.sleepycat.je.Transaction) context.getPayload();
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1392,11 +1427,11 @@
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ public StorableMessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug("public MessageMetaData getMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+ _log.debug("public MessageMetaData getMessageMetaData(Long messageId = "
+ messageId + "): called");
}
@@ -1404,7 +1439,7 @@
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
keyBinding.objectToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new MessageMetaDataTB();
+ TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
try
{
@@ -1414,7 +1449,7 @@
throw new AMQException("Metadata not found for message with id " + messageId);
}
- MessageMetaData mdd = (MessageMetaData) messageBinding.entryToObject(value);
+ StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
return mdd;
}
@@ -1426,47 +1461,110 @@
}
/**
- * Retrieves a chunk of message data.
+ * Fills the provided ByteBuffer with as much content for the specified message as possible, starting
+ * from the specified offset in the message.
*
- * @param context The transactional context for the operation.
- * @param messageId The message to get the data chunk for.
- * @param index The offset index of the data chunk within the message.
+ * @param messageId The message to get the data for.
+ * @param offset The offset of the data within the message.
+ * @param dst The destination of the content read back
*
- * @return A chunk of message data.
+ * @return The number of bytes inserted into the destination
*
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public int getContent(Long messageId, int offset, ByteBuffer dst) throws AMQException
{
-
- DatabaseEntry key = new DatabaseEntry();
- TupleBinding keyBinding = new MessageContentKey.TupleBinding();
- keyBinding.objectToEntry(new MessageContentKey(messageId, index), key);
+ final int limit = offset + dst.remaining();
+
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+
+ //TODO: if requested offset is non zero, use partial record (key-only) search to
+ //locate the first record key to prevent reading in data we dont need
+
+ //Start from 0 offset and search for the starting chunk.
+ MessageContentKey_3 mck = new MessageContentKey_3(messageId, 0);
+ TupleBinding contentKeyTupleBinding = new MessageContentKeyTB_3();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = new ContentTB();
+ TupleBinding contentTupleBinding = new ContentTB();
+
if (_log.isDebugEnabled())
{
- _log.debug("Message Id: " + messageId + " Getting content body chunk: " + index);
+ _log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
+ int written = 0;
+ int seenSoFar = 0;
+
+ Cursor cursor = null;
try
{
- OperationStatus status = _messageContentDb.get(null, key, value, LockMode.READ_UNCOMMITTED);
- if (status != OperationStatus.SUCCESS)
+ cursor = _messageContentDb.openCursor(null, null);
+
+ OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
{
+ mck = (MessageContentKey_3) contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ long id = mck.getMessageId();
+
+ if(id != messageId)
+ {
+ //we have exhausted all chunks for this message id, break
+ break;
+ }
+
+ int offsetInMessage = mck.getOffset();
+ ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
+
+ final int size = (int) buf.limit();
+
+ seenSoFar += size;
+
+ //TODO: can change this guard if we start recording the last byte in the chunk record
+ if(seenSoFar >= offset)
+ {
+ byte[] dataAsBytes = buf.array();
- throw new AMQException("Content chunk " + index + " not found for message with id " + messageId);
+ int posInArray = offset + written - offsetInMessage;
+ int count = size - posInArray;
+ if(count > dst.remaining())
+ {
+ count = dst.remaining();
+ }
+ dst.put(dataAsBytes,posInArray,count);
+ written+=count;
+
+ if(dst.remaining() == 0)
+ {
+ break;
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
- ContentChunk cb = (ContentChunk) messageBinding.entryToObject(value);
-
- return cb;
+ return written;
}
catch (DatabaseException e)
{
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
+ finally
+ {
+ if(cursor != null)
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
public boolean isPersistent()
@@ -1474,47 +1572,27 @@
return true;
}
- Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException, AMQException
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
- Cursor cursor = null;
- Map<AMQShortString, AMQQueue> queues = new HashMap<AMQShortString, AMQQueue>();
- try
+ if(metaData.isPersistent())
{
- cursor = _queueDb.openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- AMQQueue queue = (AMQQueue) binding.entryToObject(value);
- if (queue != null)
- {
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queue.getName(), queue);
- _log.info("Recovering queue " + queue.getName() + " with owner:"
- + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
- }
- }
-
- return queues;
+ return new StoredBDBMessage(getNewMessageId(), metaData);
}
- finally
+ else
{
- if (cursor != null)
- {
- cursor.close();
- }
+ return new StoredMemoryMessage(getNewMessageId(), metaData);
}
}
- //public getters for the TupleBindingFactories
- public QueueTupleBindingFactory getQueueTupleBindingFactory()
+ //protected getters for the TupleBindingFactories
+
+ protected QueueTupleBindingFactory getQueueTupleBindingFactory()
{
return _queueTupleBindingFactory;
}
- public BindingTupleBindingFactory getBindingTupleBindingFactory()
+ protected BindingTupleBindingFactory getBindingTupleBindingFactory()
{
return _bindingTupleBindingFactory;
}
@@ -1612,162 +1690,8 @@
}
}
- private static final class ProcessAction
+ private void commit(com.sleepycat.je.Transaction tx) throws DatabaseException
{
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
-
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
- {
- _queue.enqueue(_context, _message);
- }
-
- }
-
- private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
- throws DatabaseException, AMQException
- {
- Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
-
-
- Cursor cursor = null;
- try
- {
- Transaction tx = (Transaction) context.getPayload();
- cursor = _deliveryDb.openCursor(tx, null);
- DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryKey.TupleBinding();
-
- DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
- long maxId = 1;
-
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
-
- QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
-
- AMQShortString queueName = dd.queueName;
-
- AMQQueue queue = queues.get(queueName);
-
- // If the matching queue was not already found in the store, check in case a queue
- // with the same name exists in the virtualhost, otherwise we will create a duplicate
- // queue and generate a JMX InstanceAlreadyExistsException, preventing startup.
- if (queue == null)
- {
- queue = _virtualHost.getQueueRegistry().getQueue(queueName);
- if (queue != null)
- {
- queues.put(queueName, queue);
- }
- }
-
- if (queue == null)
- {
- queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
- _virtualHost.getQueueRegistry().registerQueue(queue);
- queues.put(queueName, queue);
- }
-
- long messageId = dd.messageId;
- maxId = Math.max(maxId, messageId);
- AMQMessage message = msgMap.get(messageId);
-
- if (message != null)
- {
- message.incrementReference();
- }
- else
- {
- message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
- msgMap.put(messageId, message);
- }
-
- if (_log.isDebugEnabled())
- {
- _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- _queueRecoveries.put(queueName, ++count);
-
- actions.add(new ProcessAction(queue, context, message));
-
- }
-
- for (ProcessAction action : actions)
- {
- action.process();
- }
-
- _messageId.set(maxId + 1);
- }
- catch (DatabaseException e)
- {
- _log.error("Database Error: " + e, e);
- throw e;
- }
- catch (AMQException e)
- {
- _log.error("Store Error: " + e, e);
- throw e;
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
-
- if (_log.isInfoEnabled())
- {
- _log.info("Recovered message counts: " + _queueRecoveries);
- }
-
- for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
- {
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
-
- CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
- }
-
- //Free the memory
- _queueRecoveries = null;
- }
-
- QueueRegistry getQueueRegistry()
- {
- return _virtualHost.getQueueRegistry();
- }
-
- void setVirtualHost(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
-
- createTupleBindingFactories(_version);
- }
-
- void commit(Transaction tx) throws DatabaseException
- {
// _log.debug("void commit(Transaction tx = " + tx + "): called");
tx.commitNoSync();
@@ -1787,11 +1711,11 @@
// private static final Logger _log = Logger.getLogger(Commit.class);
private final CommitThread _commitThread;
- private final Transaction _tx;
+ private final com.sleepycat.je.Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
- public Commit(CommitThread commitThread, Transaction tx)
+ public Commit(CommitThread commitThread, com.sleepycat.je.Transaction tx)
{
// _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
// + "): called");
@@ -1973,5 +1897,184 @@
}
}
}
+
+
+ private class StoredBDBMessage implements StoredMessage
+ {
+ private final long _messageId;
+ private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private StoreContext _ctx;
+ private com.sleepycat.je.Transaction _txn;
+
+ StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
+ {
+ this(messageId, metaData, true);
+ }
+
+
+ StoredBDBMessage(long messageId,
+ StorableMessageMetaData metaData, boolean persist)
+ {
+ try
+ {
+ _messageId = messageId;
+
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
+ {
+ _ctx = new StoreContext();
+ _txn = _environment.beginTransaction(null, null);
+ _ctx.setPayload(_txn);
+ storeMetaData(_ctx, messageId, metaData);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ StorableMessageMetaData metaData = _metaDataRef.get();
+ if(metaData == null)
+ {
+ try
+ {
+ metaData = BDBMessageStore.this.getMessageMetaData(_messageId);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ }
+
+ return metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ {
+ try
+ {
+ BDBMessageStore.this.addContent(_ctx, _messageId, offsetInMessage, src);
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ {
+ try
+ {
+ return BDBMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StoreFuture flushToStore()
+ {
+ try
+ {
+ if(_ctx != null)
+ {
+ BDBMessageStore.this.commitTran(_ctx);
+ }
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _txn = null;
+ _ctx = null;
+ }
+ return IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ flushToStore();
+ try
+ {
+ BDBMessageStore.this.removeMessage(_messageId);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private class BDBTransaction implements Transaction
+ {
+ private com.sleepycat.je.Transaction _txn;
+ private StoreContext _ctx;
+
+ private BDBTransaction()
+ {
+ _ctx = new StoreContext();
+ try
+ {
+ _txn = _environment.beginTransaction(null, null);
+ }
+ catch (DatabaseException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ _ctx.setPayload(_txn);
+ }
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ BDBMessageStore.this.enqueueMessage(_ctx, queue, messageId);
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ BDBMessageStore.this.dequeueMessage(_ctx, queue, messageId);
+
+ }
+
+ public void commitTran() throws AMQException
+ {
+ BDBMessageStore.this.commitTran(_ctx);
+ }
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return BDBMessageStore.this.commitTranAsync(_ctx);
+ }
+
+ public void abortTran() throws AMQException
+ {
+ BDBMessageStore.this.abortTran(_ctx);
+ }
+ }
+
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.nio.ByteBuffer;
+
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.mina.common.ByteBuffer;
-
/**
* @author Robert Greig (robert.j.greig(a)jpmorgan.com)
*/
@@ -39,39 +37,19 @@
final int size = tupleInput.readInt();
byte[] underlying = new byte[size];
tupleInput.readFast(underlying);
- final ByteBuffer data = ByteBuffer.wrap(underlying);
- ContentChunk cb = new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return data;
- }
-
- public void reduceToFit()
- {
-
- }
- };
- return cb;
+ return ByteBuffer.wrap(underlying);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- ContentChunk cb = (ContentChunk) object;
- final int size = cb.getSize();
- byte[] underlying = new byte[size];
+ ByteBuffer src = (ByteBuffer) object;
+
+ src = src.slice();
- ByteBuffer buf = cb.getData();
+ byte[] chunkData = new byte[src.limit()];
+ src.duplicate().get(chunkData);
- buf.duplicate().rewind().get(underlying);
-
- tupleOutput.writeInt(size);
- tupleOutput.writeFast(underlying);
+ tupleOutput.writeInt(chunkData.length);
+ tupleOutput.writeFast(chunkData);
}
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -4,20 +4,15 @@
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
public class ExchangeTB extends TupleBinding
{
private static final Logger _log = Logger.getLogger(ExchangeTB.class);
- private final VirtualHost _virtualHost;
-
- public ExchangeTB(VirtualHost virtualHost)
+ public ExchangeTB()
{
- _virtualHost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -27,43 +22,17 @@
AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
-
- try
- {
- Exchange exchange;
- Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
-
- if (existing != null)
- {
- _log.info("Exchange :" + existing + ": already exists in configured broker.");
- exchange = existing;
- }
- else
- {
- exchange = _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
- _log.info("Registering new durable exchange from BDB:" + exchange);
- }
-
- return exchange;
- }
- catch (AMQException e)
- {
- _log.error("Unable to create exchange: " + e, e);
- return null;
- }
+
+ return new ExchangeRecord(name, typeName, autoDelete);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- Exchange exchange = (Exchange) object;
+ ExchangeRecord exchange = (ExchangeRecord) object;
- AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
tupleOutput.writeBoolean(exchange.isAutoDelete());
-
}
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,223 +1,42 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
-import java.util.Comparator;
-import java.io.Serializable;
-
-/**
- * @author Apache Software Foundation
- */
public class MessageContentKey
{
- public long messageId;
- public int chunk;
-
- public MessageContentKey()
+ private long _messageId;
+
+ public MessageContentKey(long messageId)
{
- }
-
- public MessageContentKey(byte[] payload)
+ _messageId = messageId;
+ }
+
+
+ public long getMessageId()
{
- final TupleInput ti = new TupleInput(payload);
- messageId = ti.readLong();
- chunk = ti.readInt();
+ return _messageId;
}
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
+ public void setMessageId(long messageId)
{
- public Object entryToObject(TupleInput tupleInput)
- {
- final MessageContentKey mk = new MessageContentKey();
- mk.messageId = tupleInput.readLong();
- mk.chunk = tupleInput.readInt();
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final MessageContentKey mk = (MessageContentKey) object;
- tupleOutput.writeLong(mk.messageId);
- tupleOutput.writeInt(mk.chunk);
- }
-
-
+ this._messageId = messageId;
}
-
- public static void writeModifiedLong(TupleOutput t, long l)
- {
- int ln = (int) (0x0F & l);
- int hn = (int) (0xF0 & l);
- t.writeByte(((ln <<4) | (hn>>4)) ^ 0x55);
- t.writeByte((int) (0xFF & (l >> 8)));
- t.writeByte((int) (0xFF & (l >> 16)));
- t.writeByte((int) (0xFF & (l >> 24)));
- t.writeByte((int) (0xFF & (l >> 32)));
- t.writeByte((int) (0xFF & (l >> 40)));
- t.writeByte((int) (0xFF & (l >> 48)));
- t.writeByte((int) (0xFF & (l >> 56)));
-
- }
-
- public static long readModifiedLong(TupleInput t)
- {
- byte b0 = (byte) (0xFF & (t.readByte() ^ 0x55));
- long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
-
- l |= ((long) (0xFF & t.readByte())) << 8;
- l |= ((long) (0xFF & t.readByte())) << 16;
- l |= ((long) (0xFF & t.readByte())) << 24;
- l |= ((long) (0xFF & t.readByte())) << 32;
- l |= ((long) (0xFF & t.readByte())) << 40;
- l |= ((long) (0xFF & t.readByte())) << 48;
- l |= ((long) (0xFF & t.readByte())) << 56;
-
- return l;
- }
-
-
- public static class ContentKeyComparator implements Comparator, Serializable
- {
-
- public int compare(Object o1, Object o2)
- {
- byte[] b1 = (byte[]) o1;
- byte[] b2 = (byte[]) o2;
-
- MessageContentKey ck1 = new MessageContentKey(b1);
- MessageContentKey ck2 = new MessageContentKey(b2);
- if (ck1.messageId == ck2.messageId)
- {
- // reminder of Comparator return value:
- // return a negative integer if the first item is "less" than the second, 0 if equal
- return ck1.chunk - ck2.chunk;
- }
- else
- {
- return (int) (ck1.messageId - ck2.messageId);
- }
- }
- }
-
- public MessageContentKey(long messageId, int chunk)
- {
- this.chunk = chunk;
- this.messageId = messageId;
- }
-
-
-
- private static final char[] HEX = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
- private static StringBuilder appendHex(StringBuilder str, byte b)
- {
- str.append(HEX[0xF & (b >> 4)]);
- str.append(HEX[0xF & b]);
- return str;
- }
-
- private static StringBuilder appendHex(StringBuilder str, byte[] b)
- {
- for(int i = 0; i < b.length; i++)
- {
- appendHex(str,b[i]);
- }
- return str;
- }
-
- private static StringBuilder appendHex(StringBuilder str, int i)
- {
- appendHex(str,(byte)(0xFF & (i >> 24)));
- appendHex(str,(byte)(0xFF & (i >> 16)));
- appendHex(str,(byte)(0xFF & (i >> 8)));
- appendHex(str,(byte)(0xFF & i));
- return str;
- }
-
-
- private static StringBuilder appendHex(StringBuilder str, long l)
- {
- appendHex(str,(byte)(0xFF & (l >> 56)));
- appendHex(str,(byte)(0xFF & (l >> 48)));
- appendHex(str,(byte)(0xFF & (l >> 40)));
- appendHex(str,(byte)(0xFF & (l >> 32)));
- appendHex(str,(byte)(0xFF & (l >> 24)));
- appendHex(str,(byte)(0xFF & (l >> 16)));
- appendHex(str,(byte)(0xFF & (l >> 8)));
- appendHex(str,(byte)(0xFF & l));
- return str;
- }
-
-
-
-
- private static byte[] convertLong(long l)
- {
- byte[] b = new byte[8];
- int ln = (int) (0x0F & l);
- int hn = (int) (0xF0 & l);
- b[0] = (byte)(((ln <<4) | (hn>>4)) ^ 0x55);
- b[1] = ((byte)(0xFF & (l >> 8)));
- b[2] = ((byte)(0xFF & (l >> 16)));
- b[3] = ((byte)(0xFF & (l >> 24)));
- b[4] = ((byte)(0xFF & (l >> 32)));
- b[5] = ((byte)(0xFF & (l >> 40)));
- b[6] = ((byte)(0xFF & (l >> 48)));
- b[7] = ((byte)(0xFF & (l >> 56)));
-
- return b;
-
- }
-
-
- private static long readModifiedLong(byte[] b)
- {
- byte b0 = (byte) (b[0] ^ 0x55);
- long l = (0x0f & (b0 >> 4)) | ((0xf0 & (b0 << 4)));
- l |= ((long) b[1]) << 8;
- l |= ((long) b[2]) << 16;
- l |= ((long) b[3]) << 24;
- l |= ((long) b[4]) << 32;
- l |= ((long) b[5]) << 40;
- l |= ((long) b[6]) << 48;
- l |= ((long) b[7]) << 56;
-
-
- return l;
- }
-
-
- public static void main(String[] args)
- {
- StringBuilder s = new StringBuilder();
-
-
-
- for(long i = 1000; i < 1010; i++)
- {
- byte[] b = convertLong(i);
- System.out.println(appendHex(new StringBuilder(),b));
- System.out.println(readModifiedLong(b));
-
- }
-
- }
-}
+}
\ No newline at end of file
Deleted: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,142 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.MessageMetaData;
-
-/**
- * Handles the mapping to and from message meta data
- */
-public class MessageMetaDataTB extends TupleBinding
-{
- private static final Logger _log = Logger.getLogger(MessageMetaDataTB.class);
-
- public Object entryToObject(TupleInput tupleInput)
- {
- try
- {
- final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
- final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
- final int contentChunkCount = tupleInput.readInt();
- return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
- }
- catch (Exception e)
- {
- _log.error("Error converting entry to object: " + e, e);
- // annoyingly just have to return null since we cannot throw
- return null;
- }
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- MessageMetaData message = (MessageMetaData) object;
- try
- {
- writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
- }
- catch (AMQException e)
- {
- // can't do anything else since the BDB interface precludes throwing any exceptions
- // in practice we should never get an exception
- throw new RuntimeException("Error converting object to entry: " + e, e);
- }
- writeContentHeader(message.getContentHeaderBody(), tupleOutput);
- tupleOutput.writeInt(message.getContentChunkCount());
- }
-
- private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
- {
-
- final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
- final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- final boolean mandatory = tupleInput.readBoolean();
- final boolean immediate = tupleInput.readBoolean();
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
- }
-
-
- private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- int bodySize = tupleInput.readInt();
- byte[] underlying = new byte[bodySize];
- tupleInput.readFast(underlying);
- ByteBuffer buf = ByteBuffer.wrap(underlying);
-
- return ContentHeaderBody.createFromBuffer(buf, bodySize);
- }
-
- private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
- {
-
-
- AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
- AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
- tupleOutput.writeBoolean(publishBody.isMandatory());
- tupleOutput.writeBoolean(publishBody.isImmediate());
-
- }
-
- private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
- {
- // write out the content header body
- final int bodySize = headerBody.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- headerBody.writePayload(buf);
- tupleOutput.writeInt(bodySize);
- tupleOutput.writeFast(underlying);
- }
-}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,8 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-
import org.apache.qpid.framing.AMQShortString;
/**
@@ -10,55 +27,26 @@
*/
public class QueueEntryKey
{
- public AMQShortString queueName;
- public long messageId;
+ private AMQShortString _queueName;
+ private long _messageId;
- public QueueEntryKey()
+ public QueueEntryKey(AMQShortString queueName, long messageId)
{
+ _queueName = queueName;
+ _messageId = messageId;
}
- public QueueEntryKey(byte[] payload)
+ public AMQShortString getQueueName()
{
- final TupleInput ti = new TupleInput(payload);
-
- queueName = AMQShortStringEncoding.readShortString(ti);
-
- messageId = ti.readLong();
-
+ return _queueName;
}
- public static class TupleBinding extends com.sleepycat.bind.tuple.TupleBinding
- {
- public Object entryToObject(TupleInput tupleInput)
- {
- final QueueEntryKey mk = new QueueEntryKey();
-
- mk.queueName = AMQShortStringEncoding.readShortString(tupleInput);
- mk.messageId = tupleInput.readLong();
-
- return mk;
- }
-
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- final QueueEntryKey mk = (QueueEntryKey) object;
-
- AMQShortStringEncoding.writeShortString(mk.queueName,tupleOutput);
- tupleOutput.writeLong(mk.messageId);
-
- }
-
-
- }
-
- public QueueEntryKey(AMQShortString queueName, long messageId)
+ public long getMessageId()
{
- this.queueName = queueName;
- this.messageId = messageId;
+ return _messageId;
}
-
}
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_1.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_1 extends MessageContentKey
+{
+ private int _chunkNum;
+
+ public MessageContentKey_1(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _chunkNum = chunkNo;
+ }
+
+ public int getChunk()
+ {
+ return _chunkNum;
+ }
+
+ public void setChunk(int chunk)
+ {
+ this._chunkNum = chunk;
+ }
+}
\ No newline at end of file
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_3.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.keys;
+
+import org.apache.qpid.server.store.berkeleydb.MessageContentKey;
+
+public class MessageContentKey_3 extends MessageContentKey
+{
+ private int _offset;
+
+ public MessageContentKey_3(long messageId, int chunkNo)
+ {
+ super(messageId);
+ _offset = chunkNo;
+ }
+
+ public int getOffset()
+ {
+ return _offset;
+ }
+
+ public void setOffset(int chunk)
+ {
+ this._offset = chunk;
+ }
+}
\ No newline at end of file
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ExchangeRecord extends Object
+{
+ private final AMQShortString _exchangeName;
+ private final AMQShortString _exchangeType;
+ private final boolean _autoDelete;
+
+ public ExchangeRecord(AMQShortString exchangeName, AMQShortString exchangeType, boolean autoDelete)
+ {
+ _exchangeName = exchangeName;
+ _exchangeType = exchangeType;
+ _autoDelete = autoDelete;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _exchangeName;
+ }
+
+ public AMQShortString getType()
+ {
+ return _exchangeType;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _autoDelete;
+ }
+
+}
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.records;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueRecord extends Object
+{
+ private final AMQShortString _queueName;
+ private final AMQShortString _owner;
+ private final FieldTable _arguments;
+
+ public QueueRecord(AMQShortString queueName, AMQShortString owner, FieldTable arguments)
+ {
+ _queueName = queueName;
+ _owner = owner;
+ _arguments = arguments;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return _queueName;
+ }
+
+ public AMQShortString getOwner()
+ {
+ return _owner;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.framing.FieldTable;
-
public interface BindingTuple
{
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -21,13 +21,14 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
import com.sleepycat.bind.tuple.TupleBinding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+
public class BindingTupleBindingFactory extends TupleBindingFactory
{
- public BindingTupleBindingFactory(int version, VirtualHost virtualhost)
+ public BindingTupleBindingFactory(int version)
{
- super(version, virtualhost);
+ super(version);
}
public TupleBinding getInstance()
@@ -35,10 +36,12 @@
switch (_version)
{
default:
+ case 3:
+ //no change from v2
case 2:
- return new BindingTuple_2(_virtualhost);
+ return new BindingTuple_2();
case 1:
- return new BindingTuple_1(_virtualhost);
+ return new BindingTuple_1();
}
}
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_1.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,11 +1,8 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -16,15 +13,8 @@
{
protected static final Logger _log = Logger.getLogger(BindingTuple.class);
- protected VirtualHost _virtualhost;
-
- public BindingTuple_1(VirtualHost virtualHost)
+ public BindingTuple_1()
{
- if (virtualHost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
- _virtualhost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -33,7 +23,7 @@
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
- return createNewBindingKey(exchangeName, queueName, routingKey);
+ return new BindingKey(exchangeName, queueName, routingKey, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
@@ -45,16 +35,4 @@
AMQShortStringEncoding.writeShortString(binding.getRoutingKey(), tupleOutput);
}
- private Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey)
- {
- return createNewBindingKey(exchangeName, queueName, routingKey, null);
- }
-
- // Addition for Version 2 of this table
- protected Object createNewBindingKey(AMQShortString exchangeName, AMQShortString queueName,
- AMQShortString routingKey, FieldTable arguments)
- {
- return new BindingKey(exchangeName, queueName, routingKey, arguments);
- }
-
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_2.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -1,24 +1,21 @@
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.server.store.berkeleydb.BindingKey;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
-import org.apache.log4j.Logger;
public class BindingTuple_2 extends BindingTuple_1 implements BindingTuple
{
- public BindingTuple_2(VirtualHost virtualHost)
+ public BindingTuple_2()
{
- super(virtualHost);
+ super();
}
public Object entryToObject(TupleInput tupleInput)
@@ -28,7 +25,7 @@
AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
FieldTable arguments;
-
+
// Addition for Version 2 of this table
try
{
@@ -40,7 +37,7 @@
return null;
}
- return createNewBindingKey(exchangeName, queueName, routingKey, arguments);
+ return new BindingKey(exchangeName, queueName, routingKey, arguments);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_1.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_1;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_1 extends com.sleepycat.bind.tuple.TupleBinding
+{
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int chunk = tupleInput.readInt();
+
+ return new MessageContentKey_1(messageId, chunk);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_1 mk = (MessageContentKey_1) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getChunk());
+ }
+
+}
\ No newline at end of file
Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_3.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_3;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class MessageContentKeyTB_3 extends com.sleepycat.bind.tuple.TupleBinding
+{
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ long messageId = tupleInput.readLong();
+ int offset = tupleInput.readInt();
+
+ return new MessageContentKey_3(messageId, offset);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final MessageContentKey_3 mk = (MessageContentKey_3) object;
+ tupleOutput.writeLong(mk.getMessageId());
+ tupleOutput.writeInt(mk.getOffset());
+ }
+
+}
\ No newline at end of file
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory
+{
+ public MessageContentKeyTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 3:
+ return new MessageContentKeyTB_3();
+ case 2:
+ //no change from v1
+ case 1:
+ return new MessageContentKeyTB_1();
+ }
+ }
+}
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_1.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_1 extends TupleBinding
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_1.class);
+
+ public MessageMetaDataTB_1()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+ final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+ final int contentChunkCount = tupleInput.readInt();
+
+ return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ MessageMetaData message = (MessageMetaData) object;
+ try
+ {
+ writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+ }
+ catch (AMQException e)
+ {
+ // can't do anything else since the BDB interface precludes throwing any exceptions
+ // in practice we should never get an exception
+ throw new RuntimeException("Error converting object to entry: " + e, e);
+ }
+ writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+ tupleOutput.writeInt(message.getContentChunkCount());
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ }
+
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+ return ContentHeaderBody.createFromBuffer(buf, bodySize);
+ }
+
+ private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+ {
+
+
+ AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+ tupleOutput.writeBoolean(publishBody.isMandatory());
+ tupleOutput.writeBoolean(publishBody.isImmediate());
+
+ }
+
+ private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+ {
+ // write out the content header body
+ final int bodySize = headerBody.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ headerBody.writePayload(buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Added: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_3.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_3 extends MessageMetaDataTB_1
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_3.class);
+
+ @Override
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final int bodySize = tupleInput.readInt();
+ byte[] dataAsBytes = new byte[bodySize];
+ tupleInput.readFast(dataAsBytes);
+
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ @Override
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory
+{
+ public MessageMetaDataTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 3:
+ return new MessageMetaDataTB_3();
+ case 2:
+ //no change from v1
+ case 1:
+ return new MessageMetaDataTB_1();
+ }
+ }
+}
Copied: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (from rev 3969, store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java)
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (rev 0)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding
+{
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ Long messageId = tupleInput.readLong();
+
+ return new QueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ final QueueEntryKey mk = (QueueEntryKey) object;
+
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+}
\ No newline at end of file
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.framing.FieldTable;
-
public interface QueueTuple
{
- // Addition for Version 2
- public void setArguments(FieldTable arguments);
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.server.store.berkeleydb.tuples;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import com.sleepycat.bind.tuple.TupleBinding;
public class QueueTupleBindingFactory extends TupleBindingFactory
{
- public QueueTupleBindingFactory(int version, VirtualHost virtualHost)
+
+ public QueueTupleBindingFactory(int version)
{
- super(version,virtualHost);
+ super(version);
}
public TupleBinding getInstance()
@@ -35,10 +35,12 @@
switch (_version)
{
default:
+ case 3:
+ //no change from v2
case 2:
- return new QueueTuple_2(_virtualhost);
+ return new QueueTuple_2();
case 1:
- return new QueueTuple_1(_virtualhost);
+ return new QueueTuple_1();
}
}
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,28 +20,15 @@
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
public class QueueTuple_1 extends TupleBinding implements QueueTuple
{
- protected static final Logger _logger = Logger.getLogger(QueueTuple.class);
- protected final VirtualHost _virtualHost;
-
- public QueueTuple_1(VirtualHost virtualHost)
+ public QueueTuple_1()
{
- if (virtualHost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
- _virtualHost = virtualHost;
}
public Object entryToObject(TupleInput tupleInput)
@@ -49,58 +36,15 @@
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- return createNewQueue(name, owner);
+ return new QueueRecord(name, owner, null);
}
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
+ QueueRecord queue = (QueueRecord) object;
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ //Version1 tuple can't store the queue arguments.
}
-
- // Addition for Version 2 of this table
- public void setArguments(FieldTable arguments)
- {
- //no-op
- }
-
- protected Object createNewQueue(AMQShortString name, AMQShortString owner)
- {
- return createNewQueue(name, owner, null);
- }
-
- // Addition for Version 2 of this table
- protected Object createNewQueue(AMQShortString name, AMQShortString owner, FieldTable arguments)
- {
- try
- {
-
- AMQQueue queue;
- AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
-
- if (existing != null)
- {
- _logger.info("Queue :" + existing + ": already exists in configured broker.");
-
- queue = existing;
- }
- else
- {
- // Retrieve the existing Queue object
- queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
- _virtualHost.getQueueRegistry().registerQueue(queue);
- _logger.info("Recovering queue " + queue.getName() + " with owner:"
- + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
- }
-
- return queue;
- }
- catch (AMQException e)
- {
- _logger.error("Unable to create queue: " + e, e);
- return null;
- }
- }
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_2.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -20,22 +20,23 @@
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
public class QueueTuple_2 extends QueueTuple_1
{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_2.class);
+
protected FieldTable _arguments;
- public QueueTuple_2(VirtualHost virtualHost)
+ public QueueTuple_2()
{
- super(virtualHost);
+ super();
}
public Object entryToObject(TupleInput tupleInput)
@@ -44,10 +45,10 @@
{
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
- // Addition for Version 2 of this table
+ // Addition for Version 2 of this table, read the queue arguments
FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
- return createNewQueue(name, owner, arguments);
+ return new QueueRecord(name, owner, arguments);
}
catch (DatabaseException e)
{
@@ -59,17 +60,11 @@
public void objectToEntry(Object object, TupleOutput tupleOutput)
{
- AMQQueue queue = (AMQQueue) object;
+ QueueRecord queue = (QueueRecord) object;
- AMQShortStringEncoding.writeShortString(queue.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
- // Addition for Version 2 of this table
- FieldTableEncoding.writeFieldTable(_arguments, tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
}
-
- // Addition for Version 2 of this table
- public void setArguments(FieldTable arguments)
- {
- _arguments = arguments;
- }
}
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -27,17 +27,9 @@
{
protected int _version;
- protected VirtualHost _virtualhost;
-
- public TupleBindingFactory(int version, VirtualHost virtualhost)
+ public TupleBindingFactory(int version)
{
- if (virtualhost == null)
- {
- throw new NullPointerException("Virtualhost cannot be null");
- }
-
_version = version;
- _virtualhost = virtualhost;
}
public abstract TupleBinding getInstance();
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.util.FileUtils;
@@ -200,7 +201,7 @@
testVirtualhost.setProperty("store.version", version);
ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
- registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
+ registerVirtualHost(new VirtualHostImpl(new VirtualHostConfiguration("bdbtest",testVirtualhost), null /* TODO */));
TransportConnection.createVMBroker(port);
Modified: store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-05-13 18:28:33 UTC (rev 3970)
+++ store/trunk/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2010-05-14 16:23:59 UTC (rev 3971)
@@ -25,6 +25,7 @@
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -341,12 +342,12 @@
CurrentActor.set(new BrokerActor(new NullRootMessageLogger()));
// Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
- _newVirtualHost = new VirtualHost(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
- _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
+ _newVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
+ _oldVirtualHost = new VirtualHostImpl(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
- _newMessageStore.configure(_newVirtualHost, toDir, false);
+ //TODO _newMessageStore.configure(_newVirtualHost, toDir, false);
try
{
@@ -356,7 +357,7 @@
default:
case 1:
_oldMessageStore = new BDBMessageStore(1);
- _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
+ //TODO _oldMessageStore.configure(_oldVirtualHost, fromDir, true);
_oldMessageStore.start();
upgradeFromVersion_1();
break;
14 years, 7 months
rhmessaging commits: r3970 - in mgmt/newdata/rosemary: instance and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-13 14:28:33 -0400 (Thu, 13 May 2010)
New Revision: 3970
Added:
mgmt/newdata/rosemary/bin/rosemary-model
mgmt/newdata/rosemary/instance/model
Removed:
mgmt/newdata/rosemary/bin/rosemary-test
mgmt/newdata/rosemary/instance/xml
Log:
Rename rosemary-test to rosemary-model; fix ddl functionality and model path
Copied: mgmt/newdata/rosemary/bin/rosemary-model (from rev 3969, mgmt/newdata/rosemary/bin/rosemary-test)
===================================================================
--- mgmt/newdata/rosemary/bin/rosemary-model (rev 0)
+++ mgmt/newdata/rosemary/bin/rosemary-model 2010-05-13 18:28:33 UTC (rev 3970)
@@ -0,0 +1,87 @@
+#!/usr/bin/python
+
+from rosemary.model import *
+from rosemary.sqloperation import *
+
+def do_model(args):
+ for pkg in model._packages:
+ print "package %s" % pkg._name
+
+ for cls in pkg._classes:
+ print " class %s" % cls._name
+
+ for hdr in cls._headers:
+ print " hdr %s %s" % (hdr.name, hdr.title or "")
+
+ for ref in cls._references:
+ print " ref %s %s" % (ref.name, ref.title or "")
+
+ for prop in cls._properties:
+ print " prop %s %s" % (prop.name, prop.title or "")
+
+ for stat in cls._statistics:
+ print " stat %s %s" % (stat.name, stat.title or "")
+
+ for meth in cls._methods:
+ print " meth %s" % meth.name
+
+ for arg in meth.arguments:
+ print " arg %s" % arg.name
+
+def do_ddl(args):
+ model.sql_model.write_drop_ddl(sys.stdout)
+ model.sql_model.write_create_ddl(sys.stdout)
+
+def do_dml(args):
+ for schema in model.sql_model._schemas:
+ for table in schema._tables:
+ select = SqlSelectItem(table)
+ insert = SqlInsertItem(table)
+ update = SqlUpdateItem(table)
+ delete = SqlDeleteItem(table)
+
+ print "---", table._name, "---"
+ print
+ print insert.emit(table._columns)
+ print
+ print select.emit(table._columns)
+ print
+ print update.emit(table._columns)
+ print
+ print delete.emit(table._columns)
+ print
+
+def do_query(args):
+ schema = model.sql_model.schemas_by_name["org.apache.qpid.broker"]
+ table = schema.tables_by_name["Queue"]
+ stats_table = schema.tables_by_name["QueueStats"]
+ vhost_table = schema.tables_by_name["Vhost"]
+ broker_table = schema.tables_by_name["Broker"]
+
+ query = SqlQuery(table)
+
+ stats_col = stats_table.columns_by_name["_parent_id"]
+
+ SqlOuterJoin(query, table.key_column, stats_col)
+ SqlInnerJoin(query, table.key_column, vhost_table.key_column)
+
+ broker_col = vhost_table.columns_by_name["_brokerRef_id"]
+
+ SqlInnerJoin(query, broker_col, broker_table.key_column)
+
+ cols = table.columns + vhost_table.columns + broker_table.columns + \
+ stats_table.columns
+
+ print query.emit(cols)
+
+if __name__ == "__main__":
+ model = RosemaryModel()
+
+ model.load_model_dir(os.path.join(os.environ["ROSEMARY_HOME"], "model"))
+ model.init()
+
+ if len(sys.argv) == 1:
+ print "model, ddl, dml"
+ sys.exit(1)
+
+ globals()["do_%s" % sys.argv[1]](sys.argv[2:])
Deleted: mgmt/newdata/rosemary/bin/rosemary-test
===================================================================
--- mgmt/newdata/rosemary/bin/rosemary-test 2010-05-13 14:58:53 UTC (rev 3969)
+++ mgmt/newdata/rosemary/bin/rosemary-test 2010-05-13 18:28:33 UTC (rev 3970)
@@ -1,87 +0,0 @@
-#!/usr/bin/python
-
-from rosemary.model import *
-from rosemary.sqloperation import *
-
-def do_model(args):
- for pkg in model._packages:
- print "package %s" % pkg._name
-
- for cls in pkg._classes:
- print " class %s" % cls._name
-
- for hdr in cls._headers:
- print " hdr %s %s" % (hdr.name, hdr.title or "")
-
- for ref in cls._references:
- print " ref %s %s" % (ref.name, ref.title or "")
-
- for prop in cls._properties:
- print " prop %s %s" % (prop.name, prop.title or "")
-
- for stat in cls._statistics:
- print " stat %s %s" % (stat.name, stat.title or "")
-
- for meth in cls._methods:
- print " meth %s" % meth.name
-
- for arg in meth.arguments:
- print " arg %s" % arg.name
-
-def do_ddl(args):
- model.sql_model.write_drop_ddl(sys.stdout)
- model.sql_model.write_create_ddl(sys.stdout)
-
-def do_dml(args):
- for schema in model.sql_model.schemas:
- for table in schema.tables:
- select = SqlSelectItem(table)
- insert = SqlInsertItem(table)
- update = SqlUpdateItem(table)
- delete = SqlDeleteItem(table)
-
- print "---", table.name, "---"
- print
- print insert.emit(table.columns)
- print
- print select.emit(table.columns)
- print
- print update.emit(table.columns)
- print
- print delete.emit(table.columns)
- print
-
-def do_query(args):
- schema = model.sql_model.schemas_by_name["org.apache.qpid.broker"]
- table = schema.tables_by_name["Queue"]
- stats_table = schema.tables_by_name["QueueStats"]
- vhost_table = schema.tables_by_name["Vhost"]
- broker_table = schema.tables_by_name["Broker"]
-
- query = SqlQuery(table)
-
- stats_col = stats_table.columns_by_name["_parent_id"]
-
- SqlOuterJoin(query, table.key_column, stats_col)
- SqlInnerJoin(query, table.key_column, vhost_table.key_column)
-
- broker_col = vhost_table.columns_by_name["_brokerRef_id"]
-
- SqlInnerJoin(query, broker_col, broker_table.key_column)
-
- cols = table.columns + vhost_table.columns + broker_table.columns + \
- stats_table.columns
-
- print query.emit(cols)
-
-if __name__ == "__main__":
- model = RosemaryModel()
-
- model.load_xml_dir(os.path.join(os.environ["ROSEMARY_HOME"], "xml"))
- model.init()
-
- if len(sys.argv) == 1:
- print "model, ddl, dml"
- sys.exit(1)
-
- globals()["do_%s" % sys.argv[1]](sys.argv[2:])
Added: mgmt/newdata/rosemary/instance/model
===================================================================
--- mgmt/newdata/rosemary/instance/model (rev 0)
+++ mgmt/newdata/rosemary/instance/model 2010-05-13 18:28:33 UTC (rev 3970)
@@ -0,0 +1 @@
+link ../../cumin/model
\ No newline at end of file
Property changes on: mgmt/newdata/rosemary/instance/model
___________________________________________________________________
Name: svn:special
+ *
Deleted: mgmt/newdata/rosemary/instance/xml
===================================================================
--- mgmt/newdata/rosemary/instance/xml 2010-05-13 14:58:53 UTC (rev 3969)
+++ mgmt/newdata/rosemary/instance/xml 2010-05-13 18:28:33 UTC (rev 3970)
@@ -1 +0,0 @@
-link ../xml
\ No newline at end of file
14 years, 7 months
rhmessaging commits: r3969 - store/trunk/java/bdbstore/etc/scripts.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-05-13 10:58:53 -0400 (Thu, 13 May 2010)
New Revision: 3969
Modified:
store/trunk/java/bdbstore/etc/scripts/bdbbackuptest.sh
Log:
Update bdbbackuptest.sh with current lib names
Modified: store/trunk/java/bdbstore/etc/scripts/bdbbackuptest.sh
===================================================================
--- store/trunk/java/bdbstore/etc/scripts/bdbbackuptest.sh 2010-05-13 14:01:42 UTC (rev 3968)
+++ store/trunk/java/bdbstore/etc/scripts/bdbbackuptest.sh 2010-05-13 14:58:53 UTC (rev 3969)
@@ -1,5 +1,28 @@
#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
# Parse arguements taking all - prefixed args as JAVA_OPTS
for arg in "$@"; do
if [[ $arg == -java:* ]]; then
@@ -9,4 +32,13 @@
fi
done
-java -Xms256m -Dlog4j.configuration=perftests.log4j -Xmx256m -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} -cp qpid-bdbstore-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS}
+VERSION=0.5
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar:$QPID_HOME/lib/qpid-junit-toolkit-$VERSION.jar:$QPID_HOME/lib/junit-3.8.1.jar:$QPID_HOME/lib/log4j-1.2.12.jar:$QPID_HOME/lib/qpid-systests-$VERSION.jar:$QPID_HOME/lib/qpid-perftests-$VERSION.jar:$QPID_HOME/lib/slf4j-log4j12-1.4.0.jar:$QPID_HOME/lib/qpid-bdbstore-$VERSION.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java JAVA_MEM=-Xmx256m QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Dlog4j.configuration=perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=warn -Damqj.logging.level=warn ${JAVA_OPTS} org.apache.qpid.server.store.berkeleydb.testclient.BackupTestClient -o $QPID_WORK/results numMessagesToAction=55 ${ARGS}
+
14 years, 7 months