rhmessaging commits: r2049 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-05-14 06:23:20 -0400 (Wed, 14 May 2008)
New Revision: 2049
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Fix for https://bugzilla.redhat.com/show_bug.cgi?id=442677
Make sure txn is commited on recovering messages so that cleanup takes effect.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-14 08:02:45 UTC (rev 2048)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-14 10:23:20 UTC (rev 2049)
@@ -835,6 +835,8 @@
}
}
}
+ messages.close();
+ txn.commit();
messageIdSequence.reset(maxMessageId + 1);
}
17 years, 11 months
rhmessaging commits: r2048 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-05-14 04:02:45 -0400 (Wed, 14 May 2008)
New Revision: 2048
Modified:
store/trunk/cpp/tests/system_test.sh
Log:
Don't fail if QPID_DIR is not set (e.g. when building against installed qpid rpms)
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-05-14 07:11:53 UTC (rev 2047)
+++ store/trunk/cpp/tests/system_test.sh 2008-05-14 08:02:45 UTC (rev 2048)
@@ -24,9 +24,13 @@
error() { echo $*; exit 1; }
# Make sure $QPID_DIR contains what we need.
-test -d "$QPID_DIR" || error "WARNING: QPID_DIR is not set skipping system tests."
+if ! test -d "$QPID_DIR" ; then
+ echo "WARNING: QPID_DIR is not set skipping system tests."
+ exit
+fi
+
xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
-test -f $xml_spec || error "$xml_spec or $spec_errata or $dtx_preview not found: invalid \$QPID_DIR ?"
+test -f $xml_spec || error "$xml_spec not found: invalid \$QPID_DIR ?"
export PYTHONPATH=$QPID_DIR/python
# Create a temporary directory for store data.
17 years, 11 months
rhmessaging commits: r2047 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-05-14 03:11:53 -0400 (Wed, 14 May 2008)
New Revision: 2047
Modified:
store/trunk/cpp/tests/MessageUtils.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Fixed tests for recent changes to qpid code.
Modified: store/trunk/cpp/tests/MessageUtils.h
===================================================================
--- store/trunk/cpp/tests/MessageUtils.h 2008-05-13 22:44:04 UTC (rev 2046)
+++ store/trunk/cpp/tests/MessageUtils.h 2008-05-14 07:11:53 UTC (rev 2047)
@@ -33,10 +33,10 @@
struct MessageUtils
{
- static Message::shared_ptr createMessage(const string& exchange, const string& routingKey,
+ static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey,
const Uuid& messageId=Uuid(), uint64_t contentSize = 0)
{
- Message::shared_ptr msg(new Message());
+ boost::intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
@@ -50,7 +50,7 @@
return msg;
}
- static void addContent(Message::shared_ptr msg, const string& data)
+ static void addContent(boost::intrusive_ptr<Message> msg, const string& data)
{
AMQFrame content(in_place<AMQContentBody>(data));
msg->getFrames().append(content);
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2008-05-13 22:44:04 UTC (rev 2046)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2008-05-14 07:11:53 UTC (rev 2047)
@@ -69,7 +69,7 @@
Uuid messageId(true);
ids.push(messageId);
- Message::shared_ptr msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
queue->deliver(msg);
@@ -77,7 +77,7 @@
bool pop()
{
- Message::shared_ptr msg = queue->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
if (msg) {
queue->dequeue(0, msg);
BOOST_CHECK_EQUAL(ids.front(), msg->getProperties<MessageProperties>()->getMessageId());
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-05-13 22:44:04 UTC (rev 2046)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-05-14 07:11:53 UTC (rev 2047)
@@ -202,7 +202,7 @@
FieldTable settings;
queue->create(settings);
- Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey, messageId, 14);
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, routingKey, messageId, 14);
MessageUtils::addContent(msg, data1);
MessageUtils::addContent(msg, data2);
@@ -222,7 +222,7 @@
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
- Message::shared_ptr msg = queue->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
@@ -258,7 +258,7 @@
FieldTable settings;
queue->create(settings);
- Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey, messageId, 7);
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, routingKey, messageId, 7);
MessageUtils::addContent(msg, data);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
@@ -291,7 +291,7 @@
store.truncate();//make sure it is empty to begin with
//create & stage a message
- Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey, messageId, (data1.size() + data2.size()));
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, routingKey, messageId, (data1.size() + data2.size()));
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
@@ -343,7 +343,7 @@
Queue::shared_ptr queue = registry.find(name);
BOOST_REQUIRE(queue);
BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
- Message::shared_ptr msg = queue->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
//check headers
BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
@@ -381,7 +381,7 @@
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
- Message::shared_ptr msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
+ boost::intrusive_ptr<Message> msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
MessageUtils::addContent(msg, data);
@@ -405,7 +405,7 @@
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
- Message::shared_ptr msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
+ boost::intrusive_ptr<Message> msg(MessageUtils::createMessage("my_exchange", "my_routing_key", "my_message", data.length()));
intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
intrusive_ptr<const PersistableMessage> cpmsg = static_pointer_cast<const PersistableMessage>(msg);
MessageUtils::addContent(msg, data);
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-13 22:44:04 UTC (rev 2046)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-05-14 07:11:53 UTC (rev 2047)
@@ -69,7 +69,7 @@
queueB->create(settings);
//create message and enqueue it onto first queue:
- Message::shared_ptr msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
queueA->deliver(msg);
@@ -112,7 +112,7 @@
BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- Message::shared_ptr msg = y->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = y->dequeue().payload;
BOOST_REQUIRE(msg);
BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
}
@@ -121,7 +121,7 @@
{
setup(async);
- Message::shared_ptr msg = queueA->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
BOOST_REQUIRE(msg);
//move the message from one queue to the other as a transaction
std::auto_ptr<TransactionContext> txn = store->begin();
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-13 22:44:04 UTC (rev 2046)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-05-14 07:11:53 UTC (rev 2047)
@@ -63,7 +63,7 @@
{
TwoPhaseCommitTest* const test;
const string messageId;
- Message::shared_ptr msg;
+ boost::intrusive_ptr<Message> msg;
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
void init(){ msg = test->deliver(messageId, test->queueA); }
@@ -74,9 +74,9 @@
class Enqueue : public Strategy
{
TwoPhaseCommitTest* const test;
- Message::shared_ptr msg1;
- Message::shared_ptr msg2;
- Message::shared_ptr msg3;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ boost::intrusive_ptr<Message> msg3;
public:
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
@@ -98,9 +98,9 @@
class Dequeue : public Strategy
{
TwoPhaseCommitTest* const test;
- Message::shared_ptr msg1;
- Message::shared_ptr msg2;
- Message::shared_ptr msg3;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ boost::intrusive_ptr<Message> msg3;
public:
Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {
@@ -131,9 +131,9 @@
LinkRegistry links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
- Message::shared_ptr msg1;
- Message::shared_ptr msg2;
- Message::shared_ptr msg4;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ boost::intrusive_ptr<Message> msg4;
bool async;
void recoverPrepared(bool commit)
@@ -204,14 +204,14 @@
queueA->dequeue(txn, msg2);
}
- Message::shared_ptr enqueue(TPCTransactionContext* txn, const string& msgid)
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid)
{
- Message::shared_ptr msg = createMessage(msgid);
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
queueA->enqueue(txn, msg);
return msg;
}
- Message::shared_ptr deliver(const string& msgid, Queue::shared_ptr& queue)
+ boost::intrusive_ptr<Message> deliver(const string& msgid, Queue::shared_ptr& queue)
{
msg4 = createMessage(msgid);
queue->deliver(msg4);
@@ -232,9 +232,9 @@
queueB->create(settings);
}
- Message::shared_ptr createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+ boost::intrusive_ptr<Message> createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
{
- Message::shared_ptr msg = MessageUtils::createMessage(exchange, key);
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key);
msg->getProperties<MessageProperties>()->setCorrelationId(id);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
return msg;
@@ -276,7 +276,7 @@
BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- Message::shared_ptr msg = y->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = y->dequeue().payload;
BOOST_REQUIRE(msg);
BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
}
@@ -286,7 +286,7 @@
BOOST_REQUIRE(queueA);
BOOST_CHECK_EQUAL(size, queueA->getMessageCount());
if (size > 0) {
- Message::shared_ptr msg = queueA->dequeue().payload;
+ boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
BOOST_REQUIRE(msg);
BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
}
17 years, 11 months
rhmessaging commits: r2046 - in mgmt: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-13 18:44:04 -0400 (Tue, 13 May 2008)
New Revision: 2046
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/mint/python/mint/__init__.py
Log:
bz438219 - Attempt reconnect if a broker conection closes
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-05-13 21:54:11 UTC (rev 2045)
+++ mgmt/cumin/python/cumin/__init__.py 2008-05-13 22:44:04 UTC (rev 2046)
@@ -32,6 +32,11 @@
self.model = CuminModel(self, data_uri, spec_path)
self.broker_connect_thread = BrokerConnectThread(self.model)
+ def closeListener(*args):
+ self.broker_connect_thread.prompt()
+
+ self.model.data.setCloseListener(closeListener)
+
self.main_page = CuminPage(self, "index.html")
self.add_page(self.main_page)
self.set_default_page(self.main_page)
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-05-13 21:54:11 UTC (rev 2045)
+++ mgmt/mint/python/mint/__init__.py 2008-05-13 22:44:04 UTC (rev 2046)
@@ -445,9 +445,7 @@
def closeCallback(self, brokerId, data):
self.log("\nCLOSE---------------------------------------------------")
self.log("BrokerId=%s , Data=%s" % (brokerId, data))
- conn = self.connections[brokerId]
- if (conn and conn.isOpen()):
- conn.close()
+ del self.connections[brokerId]
if (self.connCloseListener != None):
self.connCloseListener(brokerId, data)
self.log("END CLOSE---------------------------------------------------\n")
17 years, 11 months
rhmessaging commits: r2045 - in store/trunk/cpp: tests/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-05-13 17:54:11 -0400 (Tue, 13 May 2008)
New Revision: 2045
Modified:
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Fixed the hang problem for exceptions thrown while running jtt. Also removed jtt test dir after each run.
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-05-13 20:26:09 UTC (rev 2044)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-05-13 21:54:11 UTC (rev 2045)
@@ -71,6 +71,8 @@
_reset_ok = true;
_owi = rdp->_owi;
_frot = rdp->_frot;
+ if (is_full())
+ rotate();
}
else
{
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-05-13 20:26:09 UTC (rev 2044)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-05-13 21:54:11 UTC (rev 2045)
@@ -154,9 +154,9 @@
stop(true);
_tcrp->set_stop_time();
}
- catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
- catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
- catch (...) { _tcrp->add_exception("Unknown exception"); }
+ catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+ catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+ catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
_tcrp->set_stop_time();
_tcp->add_result(_tcrp);
}
@@ -217,9 +217,9 @@
}
flush(true);
}
- catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
- catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
- catch (...) { _tcrp->add_exception("Unknown exception"); }
+ catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+ catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+ catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
}
void
@@ -305,9 +305,9 @@
}
}
}
- catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
- catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
- catch (...) { _tcrp->add_exception("Unknown exception"); }
+ catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+ catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+ catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
}
void
@@ -354,9 +354,9 @@
flush(true);
}
}
- catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
- catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
- catch (...) { _tcrp->add_exception("Unknown exception"); }
+ catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); panic(); }
+ catch (const std::exception& e) { _tcrp->add_exception(e.what()); panic(); }
+ catch (...) { _tcrp->add_exception("Unknown exception"); panic(); }
}
void
@@ -397,6 +397,16 @@
return p.get();
}
+void
+jrnl_instance::panic()
+{
+ // In the event of a panic or exception condition, release all waiting CVs
+ _rd_aio_cv.broadcast();
+ _wr_full_cv.broadcast();
+ _rd_list_cv.broadcast();
+ _deq_list_cv.broadcast();
+}
+
// static AIO callback fns
void
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-05-13 20:26:09 UTC (rev 2044)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-05-13 21:54:11 UTC (rev 2045)
@@ -137,6 +137,8 @@
void txn(const rhm::journal::data_tok* dtokp, const bool commit);
rhm::journal::data_tok* prep_txn_dtok(const rhm::journal::data_tok* dtokp);
+ void panic();
+
// static callbacks
static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2008-05-13 20:26:09 UTC (rev 2044)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2008-05-13 21:54:11 UTC (rev 2045)
@@ -2,24 +2,25 @@
fail=0
num_jrnls=1
+rm -rf /tmp/test_0*
# Run jtt using default test set
echo
echo "===== Mode 1: New journal instance, no recover ====="
+jtt/jtt --path jtt --csv jtt/jtt.csv --format-chk --num-jrnls $num_jrnls || fail=1
rm -rf /tmp/test_0*
-jtt/jtt --path jtt --csv jtt/jtt.csv --format-chk --num-jrnls $num_jrnls || fail=1
echo
echo "===== Mode 2: Re-use journal instance, no recover ====="
+jtt/jtt --path jtt --csv jtt/jtt.csv --reuse-instance --format-chk --num-jrnls $num_jrnls || fail=1
rm -rf /tmp/test_0*
-jtt/jtt --path jtt --csv jtt/jtt.csv --reuse-instance --format-chk --num-jrnls $num_jrnls || fail=1
echo
echo "===== Mode 3: New journal instance, recover previous test journal ====="
+jtt/jtt --path jtt --csv jtt/jtt.csv --recover-mode --format-chk --num-jrnls $num_jrnls || fail=1
rm -rf /tmp/test_0*
-jtt/jtt --path jtt --csv jtt/jtt.csv --recover-mode --format-chk --num-jrnls $num_jrnls || fail=1
echo
echo "===== Mode 4: Re-use journal instance, recover previous test journal ====="
+jtt/jtt --path jtt --csv jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls $num_jrnls || fail=1
rm -rf /tmp/test_0*
-jtt/jtt --path jtt --csv jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls $num_jrnls || fail=1
echo
exit $fail
17 years, 11 months
rhmessaging commits: r2044 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-05-13 16:26:09 -0400 (Tue, 13 May 2008)
New Revision: 2044
Modified:
mgmt/cumin/python/cumin/model.py
Log:
bz429573 - remove debug code and complete session protection
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-05-13 19:04:14 UTC (rev 2043)
+++ mgmt/cumin/python/cumin/model.py 2008-05-13 20:26:09 UTC (rev 2044)
@@ -918,13 +918,6 @@
reg = object.client.vhost.broker.registration
conn = self.cumin_model.data.getConnectionByRegistration(reg)
- print "curr session, conn session", object.name, conn.getSessionId()
-
- for c in self.cumin_model.data.connections:
- print "conn", self.cumin_model.data.connections[c].getSessionId()
-
- return
-
if object.name == conn.getSessionId():
raise Exception \
("Cannot close management session %s" % object.name)
17 years, 11 months
rhmessaging commits: r2043 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-05-13 15:04:14 -0400 (Tue, 13 May 2008)
New Revision: 2043
Modified:
mgmt/mint/python/mint/schema.py
Log:
handle close callback from broker
Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py 2008-05-13 19:04:03 UTC (rev 2042)
+++ mgmt/mint/python/mint/schema.py 2008-05-13 19:04:14 UTC (rev 2043)
@@ -85,11 +85,16 @@
conn.callMethod(self.idOriginal, classInfo, "echo",
callback, args=actualArgs)
- def connect(self, model, callback, host, port):
+ def connect(self, model, callback, host, port, useSsl, durable, authMechanism, username, password):
"""Establish a connection to another broker"""
actualArgs = dict()
actualArgs["host"] = host
actualArgs["port"] = port
+ actualArgs["useSsl"] = useSsl
+ actualArgs["durable"] = durable
+ actualArgs["authMechanism"] = authMechanism
+ actualArgs["username"] = username
+ actualArgs["password"] = password
conn = model.connections[self.managedBroker]
classInfo = self.classInfos[self.managedBroker]
conn.callMethod(self.idOriginal, classInfo, "connect",
@@ -122,7 +127,10 @@
managedBroker = StringCol(length=1000, default=None)
statsCurr = ForeignKey('AgentStats', cascade='null', default=None)
statsPrev = ForeignKey('AgentStats', cascade='null', default=None)
- id_ = BLOBCol(default=None)
+ sessionId = BLOBCol(default=None)
+ label = StringCol(length=1000, default=None)
+ registeredTo = BigIntCol(default=None)
+ sysId = BLOBCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -272,6 +280,7 @@
vhost = ForeignKey('Vhost', cascade='null', default=None)
name = StringCol(length=1000, default=None)
type = StringCol(length=1000, default=None)
+ durable = BoolCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -353,6 +362,7 @@
statsPrev = ForeignKey('ClientStats', cascade='null', default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
address = StringCol(length=1000, default=None)
+ incoming = BoolCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -397,8 +407,10 @@
statsCurr = ForeignKey('LinkStats', cascade='null', default=None)
statsPrev = ForeignKey('LinkStats', cascade='null', default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
- address = StringCol(length=1000, default=None)
- authIdentity = StringCol(length=1000, default=None)
+ host = StringCol(length=1000, default=None)
+ port = SmallIntCol(default=None)
+ useSsl = BoolCol(default=None)
+ durable = BoolCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -409,12 +421,15 @@
conn.callMethod(self.idOriginal, classInfo, "close",
callback, args=actualArgs)
- def bridge(self, model, callback, src, dest, key, src_is_queue, src_is_local):
+ def bridge(self, model, callback, durable, src, dest, key, id, excludes, src_is_queue, src_is_local):
"""Bridge messages over the link"""
actualArgs = dict()
+ actualArgs["durable"] = durable
actualArgs["src"] = src
actualArgs["dest"] = dest
actualArgs["key"] = key
+ actualArgs["id"] = id
+ actualArgs["excludes"] = excludes
actualArgs["src_is_queue"] = src_is_queue
actualArgs["src_is_local"] = src_is_local
conn = model.connections[self.managedBroker]
@@ -432,11 +447,8 @@
idOriginal = BigIntCol(default=None)
recTime = TimestampCol(default=None)
link = ForeignKey('Link', cascade='null', default=None)
- closing = BoolCol(default=None)
- framesFromPeer = BigIntCol(default=None)
- framesToPeer = BigIntCol(default=None)
- bytesFromPeer = BigIntCol(default=None)
- bytesToPeer = BigIntCol(default=None)
+ state = StringCol(length=1000, default=None)
+ lastError = StringCol(length=1000, default=None)
classInfos = dict() # brokerId => classInfo
17 years, 11 months
rhmessaging commits: r2042 - in mgmt/mint: python/mint and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-05-13 15:04:03 -0400 (Tue, 13 May 2008)
New Revision: 2042
Modified:
mgmt/mint/Makefile
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schemaparser.py
mgmt/mint/sql/schema.sql
Log:
handle close callback from broker
Modified: mgmt/mint/Makefile
===================================================================
--- mgmt/mint/Makefile 2008-05-13 17:08:09 UTC (rev 2041)
+++ mgmt/mint/Makefile 2008-05-13 19:04:03 UTC (rev 2042)
@@ -1,4 +1,4 @@
-.PHONY: build install schema schema-sql schema-python
+.PHONY: build install schema schema-sql schema-python clean
include ../etc/Makefile.common
@@ -13,6 +13,9 @@
build:
../bin/python-compile python
+clean:
+ rm -f python/mint/schema-store.py python/mint/schema.py python/mint/schema.sql
+
install: build
install -d ${lib}
install python/mint/*.py python/mint/*.pyc ${lib}
@@ -28,7 +31,8 @@
schema-python:
@if [ -z "$$MINT_SCHEMA_XML" ]; then echo "MINT_SCHEMA_XML is not set"; exit 1; fi
python python/mint/schemaparser.py ${MINT_SCHEMA_XML} python/mint/schema.py ${dsn}
- @if [ -z "$$STORE_SCHEMA_XML" ]; then echo "Warning: STORE_SCHEMA_XML is not set, skipping store schema generation"; else python python/mint/schemaparser.py ${STORE_SCHEMA_XML} python/mint/schema-store.py ${dsn}; cat python/mint/schema-store.py >> python/mint/schema.py; fi
+# ignore the store schema for now
+# @if [ -z "$$STORE_SCHEMA_XML" ]; then echo "Warning: STORE_SCHEMA_XML is not set, skipping store schema generation"; else python python/mint/schemaparser.py ${STORE_SCHEMA_XML} python/mint/schema-store.py ${dsn}; cat python/mint/schema-store.py >> python/mint/schema.py; rm python/mint/schema-store.py; fi
schema-sql:
sqlobject-admin sql -m mint -m mint.schema -c ${dsn} | sed -e '1,2d' > sql/schema.sql
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-05-13 17:08:09 UTC (rev 2041)
+++ mgmt/mint/python/mint/__init__.py 2008-05-13 19:04:03 UTC (rev 2042)
@@ -210,10 +210,12 @@
return
self.conn = Connection(sock, spec)
- self.mclient = managementClient(spec, None,
+ self.mclient = managementClient(spec,
+ self.model.controlCallback,
self.model.configCallback,
self.model.instCallback,
- self.model.methodCallback)
+ self.model.methodCallback,
+ self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
self.model.lock.acquire()
@@ -299,7 +301,7 @@
self.currentMethodId = 1
self.outstandingMethodCalls = dict()
self.connections = dict()
-
+ self.connCloseListener = None
self.lock = Lock()
assert MintModel.staticInstance is None
@@ -350,6 +352,9 @@
keys.append(key)
return keys
+ def setCloseListener(self, connCloseListener):
+ self.connCloseListener = connCloseListener
+
def schemaCallback(self, brokerId, classInfo,
configs, metric, methods, events):
cls = schema.schemaNameToClassMap.get(classInfo[1])
@@ -426,7 +431,7 @@
self.log("END INST---------------------------------------------------\n")
return objStats
- def methodCallback(self, broker, methodId, errorNo, errorText, args):
+ def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
self.log("\nMETHOD---------------------------------------------------")
self.log("MethodId=%d" % (methodId))
self.log("Error: %d %s" % (errorNo, errorText))
@@ -437,6 +442,23 @@
self.log("END METHOD---------------------------------------------------\n")
return result
+ def closeCallback(self, brokerId, data):
+ self.log("\nCLOSE---------------------------------------------------")
+ self.log("BrokerId=%s , Data=%s" % (brokerId, data))
+ conn = self.connections[brokerId]
+ if (conn and conn.isOpen()):
+ conn.close()
+ if (self.connCloseListener != None):
+ self.connCloseListener(brokerId, data)
+ self.log("END CLOSE---------------------------------------------------\n")
+ return
+
+ def controlCallback(self, brokerId, type, data):
+ self.log("\nCONTROL---------------------------------------------------")
+ self.log("BrokerId=%s , Type=%s, Data=%s" % (brokerId, type, data))
+ self.log("END CONTROL---------------------------------------------------\n")
+ return
+
def registerCallback(self, callback):
self.currentMethodId += 1
methodId = self.currentMethodId
Modified: mgmt/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/mint/python/mint/schemaparser.py 2008-05-13 17:08:09 UTC (rev 2041)
+++ mgmt/mint/python/mint/schemaparser.py 2008-05-13 19:04:03 UTC (rev 2042)
@@ -16,8 +16,6 @@
self.currentClass = ""
self.finalPythonOutput = "\nclassToSchemaNameMap = dict()\n"
self.finalPythonOutput += "schemaNameToClassMap = dict()\n"
- #self.pythonOutput += "conn = connectionForURI(\"%s\")\n" % (self.dsn)
- #self.pythonOutput += "sqlhub.processConnection = conn\n\n"
# mapping between xml schema types and database column types
# see xml/MintTypes.xml
self.dataTypesMap = dict()
@@ -74,13 +72,19 @@
def generateClassAttribs(self, schemaName, elements):
for elem in elements:
- if (elem["@name"].endswith("Ref")):
- reference = self.style.dbTableToPythonClass(elem["@name"]).replace("Ref", "")
- if (reference != "Store"):
- #FIX
+ # XXX FIX: properly handle a store ref
+ if (elem["@name"] == "storeRef"):
+ continue
+ # XXX FIX: properly handle a store ref
+ if (elem["@type"] == "objId"):
+ if (elem["@name"].endswith("Ref")):
+ reference = self.style.dbTableToPythonClass(elem["@name"]).replace("Ref", "")
attrib = reference[0].lower() + reference[1:]
self.generateForeignKeyAttrib(attrib, reference)
self.generateMultipleJoin(reference, self.currentClass)
+ else:
+ # if reference doesn't have a "Ref" prefix, handle as a large uint
+ self.generateAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap["uint64"])
elif (elem["@type"].startswith("hilo")):
self.generateHiLoAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]])
elif (elem["@type"].startswith("mma")):
Modified: mgmt/mint/sql/schema.sql
===================================================================
--- mgmt/mint/sql/schema.sql 2008-05-13 17:08:09 UTC (rev 2041)
+++ mgmt/mint/sql/schema.sql 2008-05-13 19:04:03 UTC (rev 2042)
@@ -60,7 +60,10 @@
managed_broker VARCHAR(1000),
stats_curr_id INT,
stats_prev_id INT,
- id_ BYTEA
+ session_id BYTEA,
+ label VARCHAR(1000),
+ registered_to BIGINT,
+ sys_id BYTEA
);
CREATE TABLE agent_stats (
@@ -158,7 +161,8 @@
stats_curr_id INT,
stats_prev_id INT,
vhost_id INT,
- address VARCHAR(1000)
+ address VARCHAR(1000),
+ incoming BOOL
);
CREATE TABLE client_stats (
@@ -235,7 +239,8 @@
stats_prev_id INT,
vhost_id INT,
name VARCHAR(1000),
- type VARCHAR(1000)
+ type VARCHAR(1000),
+ durable BOOL
);
CREATE TABLE exchange_stats (
@@ -267,8 +272,10 @@
stats_curr_id INT,
stats_prev_id INT,
vhost_id INT,
- address VARCHAR(1000),
- auth_identity VARCHAR(1000)
+ host VARCHAR(1000),
+ port SMALLINT,
+ use_ssl BOOL,
+ durable BOOL
);
CREATE TABLE link_stats (
@@ -276,11 +283,8 @@
id_original BIGINT,
rec_time TIMESTAMP,
link_id INT,
- closing BOOL,
- frames_from_peer BIGINT,
- frames_to_peer BIGINT,
- bytes_from_peer BIGINT,
- bytes_to_peer BIGINT
+ state VARCHAR(1000),
+ last_error VARCHAR(1000)
);
CREATE TABLE producer (
17 years, 11 months
rhmessaging commits: r2041 - in store/trunk: specs and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-05-13 13:08:09 -0400 (Tue, 13 May 2008)
New Revision: 2041
Modified:
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
store/trunk/specs/management-schema.xml
Log:
Renamed broker reference in management schema
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-05-13 10:36:26 UTC (rev 2040)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-05-13 17:08:09 UTC (rev 2041)
@@ -25,6 +25,7 @@
// Please do not edit.
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
namespace qpid {
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-05-13 10:36:26 UTC (rev 2040)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-05-13 17:08:09 UTC (rev 2041)
@@ -35,13 +35,13 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x0,0xc6,0x7a,0x17,0x1,0xc6,0x33,0xb4,0x5d,0x75,0xd3,0x86,0xe3,0x20,0xcc,0xe2};
+ {0x67,0x86,0x77,0xd2,0x1e,0x27,0x3e,0x46,0xf9,0xec,0x38,0xa4,0x9a,0xf5,0xf9,0x89};
Store::Store (Manageable* _core, Manageable* _parent) :
ManagementObject(_core)
{
- qpidBrokerRef = _parent->GetManagementObject ()->getObjectId ();
+ brokerRef = _parent->GetManagementObject ()->getObjectId ();
}
@@ -78,7 +78,7 @@
// Config Elements
ft = FieldTable ();
- ft.setString (NAME, "qpidBrokerRef");
+ ft.setString (NAME, "brokerRef");
ft.setInt (TYPE, TYPE_REF);
ft.setInt (ACCESS, ACCESS_RO);
ft.setInt (INDEX, 1);
@@ -133,7 +133,7 @@
configChanged = false;
writeTimestamps (buf);
- buf.putLongLong (qpidBrokerRef);
+ buf.putLongLong (brokerRef);
buf.putShortString (location);
buf.putOctet (async?1:0);
buf.putShort (defaultInitialFileCount);
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-05-13 10:36:26 UTC (rev 2040)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-05-13 17:08:09 UTC (rev 2041)
@@ -25,6 +25,7 @@
// Please do not edit.
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
namespace qpid {
@@ -39,7 +40,7 @@
static uint8_t md5Sum[16];
// Configuration Elements
- uint64_t qpidBrokerRef;
+ uint64_t brokerRef;
std::string location;
uint8_t async;
uint16_t defaultInitialFileCount;
@@ -75,9 +76,9 @@
// Method IDs
// Accessor Methods
- inline void set_qpidBrokerRef (uint64_t val){
+ inline void set_brokerRef (uint64_t val){
sys::RWlock::ScopedWlock writeLock (accessLock);
- qpidBrokerRef = val;
+ brokerRef = val;
configChanged = true;
}
inline void set_location (std::string val){
Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml 2008-05-13 10:36:26 UTC (rev 2040)
+++ store/trunk/specs/management-schema.xml 2008-05-13 17:08:09 UTC (rev 2041)
@@ -4,7 +4,7 @@
License Text
-->
<class name="store">
- <configElement name="qpidBrokerRef" type="objId" access="RO" index="y" parentRef="y"/>
+ <configElement name="brokerRef" type="objId" access="RO" index="y" parentRef="y"/>
<configElement name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
<configElement name="async" type="bool" access="RO" desc="Asynchronous IO"/>
<configElement name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
17 years, 11 months
rhmessaging commits: r2040 - in store/branches/java/broker-queue-refactor/java/bdbstore: mvn-repo/sleepycat/berkeleydb-je and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: godfrer
Date: 2008-05-13 06:36:26 -0400 (Tue, 13 May 2008)
New Revision: 2040
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/
store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Upgrade BDB version, make store work with changes to broker
Added: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
===================================================================
(Binary files differ)
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml 2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,11 +32,11 @@
<version>1.0-incubating-M2.1-SNAPSHOT</version>
</parent>
- <!-- Local repository for the BerkelyDB-je so we don't have to use the installer script -->
+ <!-- Local repository for the BerkeleyDB-je so we don't have to use the installer script -->
<repositories>
<repository>
- <id>berkley-je.local</id>
- <name>Local BerkelyDB JE Repository</name>
+ <id>berkeley-je.local</id>
+ <name>Local BerkeleyDB JE Repository</name>
<url>file://${basedir}/mvn-repo</url>
</repository>
</repositories>
@@ -61,7 +61,7 @@
<dependency>
<groupId>sleepycat</groupId>
<artifactId>berkeleydb-je</artifactId>
- <version>3.2.42</version>
+ <version>3.2.76</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-13 10:36:26 UTC (rev 2040)
@@ -40,6 +40,7 @@
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -57,6 +58,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -457,7 +459,7 @@
.getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
+ ")");
- queue.bind(binding.getRoutingKey(), binding.getArguments(), exchange);
+ queue.bind(exchange, binding.getRoutingKey(), binding.getArguments() );
}
}
}
@@ -636,14 +638,16 @@
/**
* Removes the specified queue from the persistent store.
*
- * @param name The queue to remove.
- *
+ * @param queue The queue to remove.
* @throws AMQException If the operation fails for any reason.
*/
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
+ AMQShortString name = queue.getName();
+
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTupleBinding();
keyBinding.objectToEntry(name, key);
@@ -693,16 +697,17 @@
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to place the message on.
+ * @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
// _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
// + ", Long messageId): called");
+ AMQShortString name = queue.getName();
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
@@ -753,13 +758,13 @@
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to take the message from.
+ * @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
+ AMQShortString name = queue.getName();
boolean isLocal = getOrCreateTransaction(context);
Transaction tx = (Transaction) context.getPayload();
@@ -1089,7 +1094,7 @@
_log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
-
+ getOrCreateTransaction(context);
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
@@ -1243,7 +1248,7 @@
public void process() throws AMQException
{
- _queue.process(_context, _queue.createEntry(_message), false);
+ _queue.enqueue(_context, _message);
}
}
@@ -1269,7 +1274,7 @@
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+ TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
@@ -1281,7 +1286,7 @@
AMQQueue queue = queues.get(queueName);
if (queue == null)
{
- queue = new AMQQueue(queueName, false, null, false, _virtualHost);
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
}
@@ -1439,9 +1444,9 @@
{
// _log.debug("public void commit(): called");
+ _commitThread.addJob(this);
synchronized (this)
{
- _commitThread.addJob(this);
while (!_complete)
{
try
@@ -1478,8 +1483,10 @@
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<Commit> _jobQueue = new ConcurrentLinkedQueue<Commit>();
+ private final AtomicReference<Queue<Commit>> _jobQueue = new AtomicReference<Queue<Commit>>(new ConcurrentLinkedQueue<Commit>());
private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+ private AtomicBoolean _hasJobs = new AtomicBoolean(false);
public CommitThread(String name)
{
@@ -1494,15 +1501,15 @@
{
try
{
- synchronized (this)
+ synchronized (_lock)
{
while (!_stopped.get() && !hasJobs())
{
- wait();
+ _lock.wait();
}
+ }
+ processJobs();
- processJobs();
- }
}
catch (InterruptedException e)
{
@@ -1515,33 +1522,27 @@
{
// _log.debug("private void processJobs(): called");
- List<Commit> jobs = new LinkedList<Commit>();
+ // we replace the old queue atomically with a new one and this avoids any need to
+ // copy elements out of the queue
+ Queue<Commit> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<Commit>());
- Commit job;
-
- while ((job = _jobQueue.poll()) != null)
- {
- jobs.add(job);
- }
-
try
{
// _environment.checkpoint(_config);
_environment.sync();
-
- // _log.info("Commited " + jobs.size() + " jobs.");
-
- /* Iterator<Commit> iter = jobs.iterator();
- while(iter.hasNext())
- {
- Commit commit = iter.next();
- commit.prepare(!iter.hasNext());
- }
- */
+
for (Commit commit : jobs)
{
commit.complete();
}
+ if(_jobQueue.get().isEmpty())
+ {
+ _hasJobs.set(false);
+ if(!_jobQueue.get().isEmpty())
+ {
+ _hasJobs.set(true);
+ }
+ }
}
catch (DatabaseException e)
@@ -1556,19 +1557,28 @@
private boolean hasJobs()
{
- return !_jobQueue.isEmpty();
+ return !_jobQueue.get().isEmpty();
}
- public synchronized void addJob(Commit commit)
+ public void addJob(Commit commit)
{
- _jobQueue.add(commit);
- notify();
+ _jobQueue.get().add(commit);
+ if(_hasJobs.compareAndSet(false, true))
+ {
+ synchronized(_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
}
- public synchronized void close()
+ public void close()
{
_stopped.set(true);
- notify();
+ synchronized(_lock)
+ {
+ _lock.notifyAll();
+ }
}
}
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,7 +32,7 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -42,7 +42,6 @@
import java.io.File;
import java.util.LinkedList;
import java.util.List;
-import java.util.HashSet;
public class BDBStoreTest extends TestCase
{
@@ -84,8 +83,7 @@
_store.setVirtualHost(_virtualHost);
_store.startCommitThread();
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
public void tearDown() throws Exception
@@ -95,7 +93,7 @@
public void testQueuePersistence() throws DatabaseException, AMQException
{
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
AMQQueue returnedQueue = _store.getQueue(QUEUE1);
@@ -262,12 +260,12 @@
_store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 20L);
- _store.enqueueMessage(_storeContext, QUEUE1, 21L);
+ _store.enqueueMessage(_storeContext, queue, 20L);
+ _store.enqueueMessage(_storeContext, queue, 21L);
_store.commitTran(_storeContext);
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -291,17 +289,17 @@
_store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 30L);
- _store.enqueueMessage(_storeContext, QUEUE1, 31L);
+ _store.enqueueMessage(_storeContext, queue, 30L);
+ _store.enqueueMessage(_storeContext, queue, 31L);
_store.commitTran(_storeContext);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+ _store.enqueueMessage(_storeContext, queue, 32L);
_store.abortTran(_storeContext);
_store.beginTran(_storeContext);
@@ -329,16 +327,16 @@
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 30L);
+ _store.enqueueMessage(_storeContext, queue, 30L);
_store.abortTran(_storeContext);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 31L);
- _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+ _store.enqueueMessage(_storeContext, queue, 31L);
+ _store.enqueueMessage(_storeContext, queue, 32L);
_store.commitTran(_storeContext);
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -363,19 +361,19 @@
_store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
- AMQQueue queue2 = new AMQQueue(QUEUE2, true, HIM, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
+ AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
_store.createQueue(queue);
_store.createQueue(queue2);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 40L);
- _store.enqueueMessage(_storeContext, QUEUE1, 41L);
- _store.enqueueMessage(_storeContext, QUEUE2, 42L);
+ _store.enqueueMessage(_storeContext, queue, 40L);
+ _store.enqueueMessage(_storeContext, queue, 41L);
+ _store.enqueueMessage(_storeContext, queue2, 42L);
_store.commitTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 42L);
+ _store.enqueueMessage(_storeContext, queue, 42L);
_virtualHost.getQueueRegistry().unregisterQueue(queue.getName());
_virtualHost.getQueueRegistry().unregisterQueue(queue2.getName());
@@ -421,21 +419,21 @@
_store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
- _store.enqueueMessage(_storeContext, QUEUE1, 50L);
- _store.dequeueMessage(_storeContext, QUEUE1, 50L);
+ _store.enqueueMessage(_storeContext, queue, 50L);
+ _store.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
- _store.removeQueue(QUEUE1);
+ _store.removeQueue(queue);
try
{
- _store.removeQueue(QUEUE1);
+ _store.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant queue");
}
catch (AMQException e)
17 years, 11 months